diff options
Diffstat (limited to 'src/events/routes/test.rs')
| -rw-r--r-- | src/events/routes/test.rs | 197 |
1 files changed, 173 insertions, 24 deletions
diff --git a/src/events/routes/test.rs b/src/events/routes/test.rs index df2d5f6..131c751 100644 --- a/src/events/routes/test.rs +++ b/src/events/routes/test.rs @@ -22,7 +22,9 @@ async fn no_subscriptions() { // Call the endpoint let subscribed_at = fixtures::now(); - let query = routes::EventsQuery { channels: vec![] }; + let query = routes::EventsQuery { + channels: [].into(), + }; let routes::Events(mut events) = routes::events(State(app), subscribed_at, subscriber, None, Query(query)) .await @@ -47,7 +49,7 @@ async fn includes_historical_message() { let subscriber = fixtures::login::create(&app).await; let subscribed_at = fixtures::now(); let query = routes::EventsQuery { - channels: vec![channel.id.clone()], + channels: [channel.id.clone()].into(), }; let routes::Events(mut events) = routes::events(State(app), subscribed_at, subscriber, None, Query(query)) @@ -56,7 +58,7 @@ async fn includes_historical_message() { // Verify the structure of the response. - let event = events + let routes::ReplayableEvent(_, event) = events .next() .immediately() .await @@ -78,7 +80,7 @@ async fn includes_live_message() { let subscriber = fixtures::login::create(&app).await; let subscribed_at = fixtures::now(); let query = routes::EventsQuery { - channels: vec![channel.id.clone()], + channels: [channel.id.clone()].into(), }; let routes::Events(mut events) = routes::events( State(app.clone()), @@ -95,7 +97,7 @@ async fn includes_live_message() { let sender = fixtures::login::create(&app).await; let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await; - let event = events + let routes::ReplayableEvent(_, event) = events .next() .immediately() .await @@ -121,7 +123,7 @@ async fn excludes_other_channels() { let subscriber = fixtures::login::create(&app).await; let subscribed_at = fixtures::now(); let query = routes::EventsQuery { - channels: vec![subscribed.id.clone()], + channels: [subscribed.id.clone()].into(), }; let routes::Events(mut events) = routes::events(State(app), subscribed_at, subscriber, None, Query(query)) @@ -130,7 +132,7 @@ async fn excludes_other_channels() { // Verify the semantics - let event = events + let routes::ReplayableEvent(_, event) = events .next() .immediately() .await @@ -186,9 +188,9 @@ async fn includes_multiple_channels() { .await; for (channel, message) in messages { - assert!(events - .iter() - .any(|event| { event.channel == channel.id && event.message == message })); + assert!(events.iter().any(|routes::ReplayableEvent(_, event)| { + event.channel == channel.id && event.message == message + })); } } @@ -204,7 +206,7 @@ async fn nonexitent_channel() { let subscriber = fixtures::login::create(&app).await; let subscribed_at = fixtures::now(); let query = routes::EventsQuery { - channels: vec![channel.clone()], + channels: [channel.clone()].into(), }; let routes::ErrorResponse(error) = routes::events(State(app), subscribed_at, subscriber, None, Query(query)) @@ -239,7 +241,7 @@ async fn sequential_messages() { let subscriber = fixtures::login::create(&app).await; let subscribed_at = fixtures::now(); let query = routes::EventsQuery { - channels: vec![channel.id.clone()], + channels: [channel.id.clone()].into(), }; let routes::Events(events) = routes::events(State(app), subscribed_at, subscriber, None, Query(query)) @@ -248,11 +250,13 @@ async fn sequential_messages() { // Verify the structure of the response. - let mut events = events.filter(|event| future::ready(messages.contains(&event.message))); + let mut events = events.filter(|routes::ReplayableEvent(_, event)| { + future::ready(messages.contains(&event.message)) + }); // Verify delivery in order for message in &messages { - let event = events + let routes::ReplayableEvent(_, event) = events .next() .immediately() .await @@ -283,7 +287,7 @@ async fn resumes_from() { let subscriber = fixtures::login::create(&app).await; let subscribed_at = fixtures::now(); let query = routes::EventsQuery { - channels: vec![channel.id.clone()], + channels: [channel.id.clone()].into(), }; let resume_at = { @@ -298,19 +302,20 @@ async fn resumes_from() { .await .expect("subscribed to a valid channel"); - let event = events.next().immediately().await.expect("delivered events"); + let routes::ReplayableEvent(id, event) = + events.next().immediately().await.expect("delivered events"); assert_eq!(channel.id, event.channel); assert_eq!(initial_message, event.message); - event.event_id() + id }; // Resume after disconnect - let resumed_at = fixtures::now(); + let reconnect_at = fixtures::now(); let routes::Events(resumed) = routes::events( State(app), - resumed_at, + reconnect_at, subscriber, Some(resume_at.into()), Query(query), @@ -327,12 +332,156 @@ async fn resumes_from() { .await; for message in later_messages { - assert!(events - .iter() - .any(|event| event.channel == channel.id && event.message == message)); + assert!(events.iter().any( + |routes::ReplayableEvent(_, event)| event.channel == channel.id + && event.message == message + )); } } +// This test verifies a real bug I hit developing the vector-of-sequences +// approach to resuming events. A small omission caused the event IDs in a +// resumed stream to _omit_ channels that were in the original stream until +// those channels also appeared in the resumed stream. +// +// Clients would see something like +// * In the original stream, Cfoo=5,Cbar=8 +// * In the resumed stream, Cfoo=6 (no Cbar sequence number) +// +// Disconnecting and reconnecting a second time, using event IDs from that +// initial period of the first resume attempt, would then cause the second +// resume attempt to restart all other channels from the beginning, and not +// from where the first disconnection happened. +// +// This is a real and valid behaviour for clients! +#[tokio::test] +async fn serial_resume() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let sender = fixtures::login::create(&app).await; + let channel_a = fixtures::channel::create(&app).await; + let channel_b = fixtures::channel::create(&app).await; + + // Call the endpoint + + let subscriber = fixtures::login::create(&app).await; + let query = routes::EventsQuery { + channels: [channel_a.id.clone(), channel_b.id.clone()].into(), + }; + + let resume_at = { + let initial_messages = [ + fixtures::message::send(&app, &sender, &channel_a, &fixtures::now()).await, + fixtures::message::send(&app, &sender, &channel_b, &fixtures::now()).await, + ]; + + // First subscription + let subscribed_at = fixtures::now(); + let routes::Events(events) = routes::events( + State(app.clone()), + subscribed_at, + subscriber.clone(), + None, + Query(query.clone()), + ) + .await + .expect("subscribed to a valid channel"); + + let events = events + .take(initial_messages.len()) + .collect::<Vec<_>>() + .immediately() + .await; + + for message in initial_messages { + assert!(events + .iter() + .any(|routes::ReplayableEvent(_, event)| event.message == message)); + } + + let routes::ReplayableEvent(id, _) = events.last().expect("this vec is non-empty"); + + id.to_owned() + }; + + // Resume after disconnect + let resume_at = { + let resume_messages = [ + // Note that channel_b does not appear here. The buggy behaviour + // would be masked if channel_b happened to send a new message + // into the resumed event stream. + fixtures::message::send(&app, &sender, &channel_a, &fixtures::now()).await, + fixtures::message::send(&app, &sender, &channel_a, &fixtures::now()).await, + ]; + + // Second subscription + let resubscribed_at = fixtures::now(); + let routes::Events(events) = routes::events( + State(app.clone()), + resubscribed_at, + subscriber.clone(), + Some(resume_at.into()), + Query(query.clone()), + ) + .await + .expect("subscribed to a valid channel"); + + let events = events + .take(resume_messages.len()) + .collect::<Vec<_>>() + .immediately() + .await; + + for message in resume_messages { + assert!(events + .iter() + .any(|routes::ReplayableEvent(_, event)| event.message == message)); + } + + let routes::ReplayableEvent(id, _) = events.last().expect("this vec is non-empty"); + + id.to_owned() + }; + + // Resume after disconnect a second time + { + // At this point, we can send on either channel and demonstrate the + // problem. The resume point should before both of these messages, but + // after _all_ prior messages. + let final_messages = [ + fixtures::message::send(&app, &sender, &channel_a, &fixtures::now()).await, + fixtures::message::send(&app, &sender, &channel_b, &fixtures::now()).await, + ]; + + // Second subscription + let resubscribed_at = fixtures::now(); + let routes::Events(events) = routes::events( + State(app.clone()), + resubscribed_at, + subscriber.clone(), + Some(resume_at.into()), + Query(query.clone()), + ) + .await + .expect("subscribed to a valid channel"); + + let events = events + .take(final_messages.len()) + .collect::<Vec<_>>() + .immediately() + .await; + + // This set of messages, in particular, _should not_ include any prior + // messages from `initial_messages` or `resume_messages`. + for message in final_messages { + assert!(events + .iter() + .any(|routes::ReplayableEvent(_, event)| event.message == message)); + } + }; +} + #[tokio::test] async fn removes_expired_messages() { // Set up the environment @@ -348,7 +497,7 @@ async fn removes_expired_messages() { let subscriber = fixtures::login::create(&app).await; let subscribed_at = fixtures::now(); let query = routes::EventsQuery { - channels: vec![channel.id.clone()], + channels: [channel.id.clone()].into_iter().collect(), }; let routes::Events(mut events) = routes::events(State(app), subscribed_at, subscriber, None, Query(query)) @@ -357,7 +506,7 @@ async fn removes_expired_messages() { // Verify the semantics - let event = events + let routes::ReplayableEvent(_, event) = events .next() .immediately() .await |
