summaryrefslogtreecommitdiff
path: root/src/events
diff options
context:
space:
mode:
authorOwen Jacobson <owen@grimoire.ca>2024-10-01 22:43:18 -0400
committerOwen Jacobson <owen@grimoire.ca>2024-10-01 23:14:49 -0400
commitd171a258ad2119e39cb715f8800031fff16967dc (patch)
tree453cf4c65fa18ff98ef13d9730f1a0f74ff68540 /src/events
parentb8392a5fe824eff46f912a58885546e7b0f37e6f (diff)
Provide a resume point to bridge clients from state snapshots to the event sequence.
Diffstat (limited to 'src/events')
-rw-r--r--src/events/routes.rs11
-rw-r--r--src/events/routes/test.rs67
2 files changed, 56 insertions, 22 deletions
diff --git a/src/events/routes.rs b/src/events/routes.rs
index e3a959f..d81c7fb 100644
--- a/src/events/routes.rs
+++ b/src/events/routes.rs
@@ -7,6 +7,7 @@ use axum::{
routing::get,
Router,
};
+use axum_extra::extract::Query;
use futures::stream::{Stream, StreamExt as _};
use super::{extract::LastEventId, types};
@@ -24,12 +25,20 @@ pub fn router() -> Router<App> {
Router::new().route("/api/events", get(events))
}
+#[derive(Default, serde::Deserialize)]
+struct EventsQuery {
+ resume_point: Option<Sequence>,
+}
+
async fn events(
State(app): State<App>,
identity: Identity,
last_event_id: Option<LastEventId<Sequence>>,
+ Query(query): Query<EventsQuery>,
) -> Result<Events<impl Stream<Item = types::ChannelEvent> + std::fmt::Debug>, EventsError> {
- let resume_at = last_event_id.map(LastEventId::into_inner);
+ let resume_at = last_event_id
+ .map(LastEventId::into_inner)
+ .or(query.resume_point);
let stream = app.events().subscribe(resume_at).await?;
let stream = app.logins().limit_stream(identity.token, stream).await?;
diff --git a/src/events/routes/test.rs b/src/events/routes/test.rs
index 1cfca4f..11f01b8 100644
--- a/src/events/routes/test.rs
+++ b/src/events/routes/test.rs
@@ -1,4 +1,5 @@
use axum::extract::State;
+use axum_extra::extract::Query;
use futures::{
future,
stream::{self, StreamExt as _},
@@ -22,7 +23,7 @@ async fn includes_historical_message() {
let subscriber_creds = fixtures::login::create_with_password(&app).await;
let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await;
- let routes::Events(events) = routes::events(State(app), subscriber, None)
+ let routes::Events(events) = routes::events(State(app), subscriber, None, Query::default())
.await
.expect("subscribe never fails");
@@ -49,9 +50,10 @@ async fn includes_live_message() {
let subscriber_creds = fixtures::login::create_with_password(&app).await;
let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await;
- let routes::Events(events) = routes::events(State(app.clone()), subscriber, None)
- .await
- .expect("subscribe never fails");
+ let routes::Events(events) =
+ routes::events(State(app.clone()), subscriber, None, Query::default())
+ .await
+ .expect("subscribe never fails");
// Verify the semantics
@@ -94,7 +96,7 @@ async fn includes_multiple_channels() {
let subscriber_creds = fixtures::login::create_with_password(&app).await;
let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await;
- let routes::Events(events) = routes::events(State(app), subscriber, None)
+ let routes::Events(events) = routes::events(State(app), subscriber, None, Query::default())
.await
.expect("subscribe never fails");
@@ -130,7 +132,7 @@ async fn sequential_messages() {
let subscriber_creds = fixtures::login::create_with_password(&app).await;
let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await;
- let routes::Events(events) = routes::events(State(app), subscriber, None)
+ let routes::Events(events) = routes::events(State(app), subscriber, None, Query::default())
.await
.expect("subscribe never fails");
@@ -172,9 +174,14 @@ async fn resumes_from() {
let resume_at = {
// First subscription
- let routes::Events(events) = routes::events(State(app.clone()), subscriber.clone(), None)
- .await
- .expect("subscribe never fails");
+ let routes::Events(events) = routes::events(
+ State(app.clone()),
+ subscriber.clone(),
+ None,
+ Query::default(),
+ )
+ .await
+ .expect("subscribe never fails");
let event = events
.filter(fixtures::filter::messages())
@@ -189,9 +196,14 @@ async fn resumes_from() {
};
// Resume after disconnect
- let routes::Events(resumed) = routes::events(State(app), subscriber, Some(resume_at.into()))
- .await
- .expect("subscribe never fails");
+ let routes::Events(resumed) = routes::events(
+ State(app),
+ subscriber,
+ Some(resume_at.into()),
+ Query::default(),
+ )
+ .await
+ .expect("subscribe never fails");
// Verify the structure of the response.
@@ -242,9 +254,14 @@ async fn serial_resume() {
];
// First subscription
- let routes::Events(events) = routes::events(State(app.clone()), subscriber.clone(), None)
- .await
- .expect("subscribe never fails");
+ let routes::Events(events) = routes::events(
+ State(app.clone()),
+ subscriber.clone(),
+ None,
+ Query::default(),
+ )
+ .await
+ .expect("subscribe never fails");
let events = events
.filter(fixtures::filter::messages())
@@ -277,6 +294,7 @@ async fn serial_resume() {
State(app.clone()),
subscriber.clone(),
Some(resume_at.into()),
+ Query::default(),
)
.await
.expect("subscribe never fails");
@@ -312,6 +330,7 @@ async fn serial_resume() {
State(app.clone()),
subscriber.clone(),
Some(resume_at.into()),
+ Query::default(),
)
.await
.expect("subscribe never fails");
@@ -345,9 +364,10 @@ async fn terminates_on_token_expiry() {
let subscriber =
fixtures::identity::identity(&app, &subscriber_creds, &fixtures::ancient()).await;
- let routes::Events(events) = routes::events(State(app.clone()), subscriber, None)
- .await
- .expect("subscribe never fails");
+ let routes::Events(events) =
+ routes::events(State(app.clone()), subscriber, None, Query::default())
+ .await
+ .expect("subscribe never fails");
// Verify the resulting stream's behaviour
@@ -387,9 +407,14 @@ async fn terminates_on_logout() {
let subscriber =
fixtures::identity::from_token(&app, &subscriber_token, &fixtures::now()).await;
- let routes::Events(events) = routes::events(State(app.clone()), subscriber.clone(), None)
- .await
- .expect("subscribe never fails");
+ let routes::Events(events) = routes::events(
+ State(app.clone()),
+ subscriber.clone(),
+ None,
+ Query::default(),
+ )
+ .await
+ .expect("subscribe never fails");
// Verify the resulting stream's behaviour