diff options
| author | Owen Jacobson <owen@grimoire.ca> | 2024-10-01 22:43:18 -0400 |
|---|---|---|
| committer | Owen Jacobson <owen@grimoire.ca> | 2024-10-01 23:14:49 -0400 |
| commit | d171a258ad2119e39cb715f8800031fff16967dc (patch) | |
| tree | 453cf4c65fa18ff98ef13d9730f1a0f74ff68540 /src/events | |
| parent | b8392a5fe824eff46f912a58885546e7b0f37e6f (diff) | |
Provide a resume point to bridge clients from state snapshots to the event sequence.
Diffstat (limited to 'src/events')
| -rw-r--r-- | src/events/routes.rs | 11 | ||||
| -rw-r--r-- | src/events/routes/test.rs | 67 |
2 files changed, 56 insertions, 22 deletions
diff --git a/src/events/routes.rs b/src/events/routes.rs index e3a959f..d81c7fb 100644 --- a/src/events/routes.rs +++ b/src/events/routes.rs @@ -7,6 +7,7 @@ use axum::{ routing::get, Router, }; +use axum_extra::extract::Query; use futures::stream::{Stream, StreamExt as _}; use super::{extract::LastEventId, types}; @@ -24,12 +25,20 @@ pub fn router() -> Router<App> { Router::new().route("/api/events", get(events)) } +#[derive(Default, serde::Deserialize)] +struct EventsQuery { + resume_point: Option<Sequence>, +} + async fn events( State(app): State<App>, identity: Identity, last_event_id: Option<LastEventId<Sequence>>, + Query(query): Query<EventsQuery>, ) -> Result<Events<impl Stream<Item = types::ChannelEvent> + std::fmt::Debug>, EventsError> { - let resume_at = last_event_id.map(LastEventId::into_inner); + let resume_at = last_event_id + .map(LastEventId::into_inner) + .or(query.resume_point); let stream = app.events().subscribe(resume_at).await?; let stream = app.logins().limit_stream(identity.token, stream).await?; diff --git a/src/events/routes/test.rs b/src/events/routes/test.rs index 1cfca4f..11f01b8 100644 --- a/src/events/routes/test.rs +++ b/src/events/routes/test.rs @@ -1,4 +1,5 @@ use axum::extract::State; +use axum_extra::extract::Query; use futures::{ future, stream::{self, StreamExt as _}, @@ -22,7 +23,7 @@ async fn includes_historical_message() { let subscriber_creds = fixtures::login::create_with_password(&app).await; let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await; - let routes::Events(events) = routes::events(State(app), subscriber, None) + let routes::Events(events) = routes::events(State(app), subscriber, None, Query::default()) .await .expect("subscribe never fails"); @@ -49,9 +50,10 @@ async fn includes_live_message() { let subscriber_creds = fixtures::login::create_with_password(&app).await; let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await; - let routes::Events(events) = routes::events(State(app.clone()), subscriber, None) - .await - .expect("subscribe never fails"); + let routes::Events(events) = + routes::events(State(app.clone()), subscriber, None, Query::default()) + .await + .expect("subscribe never fails"); // Verify the semantics @@ -94,7 +96,7 @@ async fn includes_multiple_channels() { let subscriber_creds = fixtures::login::create_with_password(&app).await; let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await; - let routes::Events(events) = routes::events(State(app), subscriber, None) + let routes::Events(events) = routes::events(State(app), subscriber, None, Query::default()) .await .expect("subscribe never fails"); @@ -130,7 +132,7 @@ async fn sequential_messages() { let subscriber_creds = fixtures::login::create_with_password(&app).await; let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await; - let routes::Events(events) = routes::events(State(app), subscriber, None) + let routes::Events(events) = routes::events(State(app), subscriber, None, Query::default()) .await .expect("subscribe never fails"); @@ -172,9 +174,14 @@ async fn resumes_from() { let resume_at = { // First subscription - let routes::Events(events) = routes::events(State(app.clone()), subscriber.clone(), None) - .await - .expect("subscribe never fails"); + let routes::Events(events) = routes::events( + State(app.clone()), + subscriber.clone(), + None, + Query::default(), + ) + .await + .expect("subscribe never fails"); let event = events .filter(fixtures::filter::messages()) @@ -189,9 +196,14 @@ async fn resumes_from() { }; // Resume after disconnect - let routes::Events(resumed) = routes::events(State(app), subscriber, Some(resume_at.into())) - .await - .expect("subscribe never fails"); + let routes::Events(resumed) = routes::events( + State(app), + subscriber, + Some(resume_at.into()), + Query::default(), + ) + .await + .expect("subscribe never fails"); // Verify the structure of the response. @@ -242,9 +254,14 @@ async fn serial_resume() { ]; // First subscription - let routes::Events(events) = routes::events(State(app.clone()), subscriber.clone(), None) - .await - .expect("subscribe never fails"); + let routes::Events(events) = routes::events( + State(app.clone()), + subscriber.clone(), + None, + Query::default(), + ) + .await + .expect("subscribe never fails"); let events = events .filter(fixtures::filter::messages()) @@ -277,6 +294,7 @@ async fn serial_resume() { State(app.clone()), subscriber.clone(), Some(resume_at.into()), + Query::default(), ) .await .expect("subscribe never fails"); @@ -312,6 +330,7 @@ async fn serial_resume() { State(app.clone()), subscriber.clone(), Some(resume_at.into()), + Query::default(), ) .await .expect("subscribe never fails"); @@ -345,9 +364,10 @@ async fn terminates_on_token_expiry() { let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::ancient()).await; - let routes::Events(events) = routes::events(State(app.clone()), subscriber, None) - .await - .expect("subscribe never fails"); + let routes::Events(events) = + routes::events(State(app.clone()), subscriber, None, Query::default()) + .await + .expect("subscribe never fails"); // Verify the resulting stream's behaviour @@ -387,9 +407,14 @@ async fn terminates_on_logout() { let subscriber = fixtures::identity::from_token(&app, &subscriber_token, &fixtures::now()).await; - let routes::Events(events) = routes::events(State(app.clone()), subscriber.clone(), None) - .await - .expect("subscribe never fails"); + let routes::Events(events) = routes::events( + State(app.clone()), + subscriber.clone(), + None, + Query::default(), + ) + .await + .expect("subscribe never fails"); // Verify the resulting stream's behaviour |
