diff options
| author | Owen Jacobson <owen@grimoire.ca> | 2024-10-24 22:37:22 -0400 |
|---|---|---|
| committer | Owen Jacobson <owen@grimoire.ca> | 2024-10-24 22:37:22 -0400 |
| commit | 0bb17bd01640492db2685e67bacac12dd54a9f59 (patch) | |
| tree | 5c8425b8e6a962e609f82a7b3f2498524d8d6c6f /src/event/routes/test/resume.rs | |
| parent | d97f8ac1858fbd46b77b541a612cc37b07fc0b5d (diff) | |
Tests for channel, invite, setup, and message deletion events.
This also found a bug! No live event was being emitted during invite accept. The only way to find out about invites was to reconnect.
Diffstat (limited to 'src/event/routes/test/resume.rs')
| -rw-r--r-- | src/event/routes/test/resume.rs | 220 |
1 files changed, 220 insertions, 0 deletions
diff --git a/src/event/routes/test/resume.rs b/src/event/routes/test/resume.rs new file mode 100644 index 0000000..c393d38 --- /dev/null +++ b/src/event/routes/test/resume.rs @@ -0,0 +1,220 @@ +use std::future; + +use axum::extract::State; +use axum_extra::extract::Query; +use futures::stream::{self, StreamExt as _}; + +use crate::{ + event::{routes::get, Sequenced as _}, + test::fixtures::{self, future::Immediately as _}, +}; + +#[tokio::test] +async fn resume() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let sender = fixtures::login::create(&app, &fixtures::now()).await; + + let initial_message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; + + let later_messages = vec![ + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + ]; + + // Call the endpoint + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + + let resume_at = { + // First subscription + let get::Response(events) = get::handler( + State(app.clone()), + subscriber.clone(), + None, + Query::default(), + ) + .await + .expect("subscribe never fails"); + + let event = events + .filter_map(fixtures::message::events) + .filter_map(fixtures::message::sent) + .filter(|event| future::ready(event.message == initial_message)) + .next() + .immediately() + .await + .expect("delivered events"); + + event.sequence() + }; + + // Resume after disconnect + let get::Response(resumed) = get::handler( + State(app), + subscriber, + Some(resume_at.into()), + Query::default(), + ) + .await + .expect("subscribe never fails"); + + // Verify final events + + let mut events = resumed + .filter_map(fixtures::message::events) + .filter_map(fixtures::message::sent) + .zip(stream::iter(later_messages)); + + while let Some((event, message)) = events.next().immediately().await { + assert_eq!(message, event.message); + } +} + +// This test verifies a real bug I hit developing the vector-of-sequences +// approach to resuming events. A small omission caused the event IDs in a +// resumed stream to _omit_ channels that were in the original stream until +// those channels also appeared in the resumed stream. +// +// Clients would see something like +// * In the original stream, Cfoo=5,Cbar=8 +// * In the resumed stream, Cfoo=6 (no Cbar sequence number) +// +// Disconnecting and reconnecting a second time, using event IDs from that +// initial period of the first resume attempt, would then cause the second +// resume attempt to restart all other channels from the beginning, and not +// from where the first disconnection happened. +// +// As we have switched to a single global event sequence number, this scenario +// can no longer arise, but this test is preserved because the actual behaviour +// _is_ a valid way for clients to behave, and should work. We might as well +// keep testing it. +#[tokio::test] +async fn serial_resume() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let sender = fixtures::login::create(&app, &fixtures::now()).await; + let channel_a = fixtures::channel::create(&app, &fixtures::now()).await; + let channel_b = fixtures::channel::create(&app, &fixtures::now()).await; + + // Call the endpoint + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + + let resume_at = { + let initial_messages = [ + fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel_b, &sender, &fixtures::now()).await, + ]; + + // First subscription + + let get::Response(events) = get::handler( + State(app.clone()), + subscriber.clone(), + None, + Query::default(), + ) + .await + .expect("subscribe never fails"); + + // Check for expected events + + let events = events + .filter_map(fixtures::message::events) + .filter_map(fixtures::message::sent) + .zip(stream::iter(initial_messages)) + .collect::<Vec<_>>() + .immediately() + .await; + + assert!(events + .iter() + .all(|(event, message)| message == &event.message)); + + let (event, _) = events.last().expect("this vec is non-empty"); + + // Take the last one's resume point + + event.sequence() + }; + + // Resume after disconnect + let resume_at = { + let resume_messages = [ + // Note that channel_b does not appear here. The buggy behaviour + // would be masked if channel_b happened to send a new message + // into the resumed event stream. + fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await, + ]; + + // Second subscription + let get::Response(events) = get::handler( + State(app.clone()), + subscriber.clone(), + Some(resume_at.into()), + Query::default(), + ) + .await + .expect("subscribe never fails"); + + // Check for expected events + + let events = events + .filter_map(fixtures::message::events) + .filter_map(fixtures::message::sent) + .zip(stream::iter(resume_messages)) + .collect::<Vec<_>>() + .immediately() + .await; + + assert!(events + .iter() + .all(|(event, message)| message == &event.message)); + + let (event, _) = events.last().expect("this vec is non-empty"); + + // Take the last one's resume point + + event.sequence() + }; + + // Resume after disconnect a second time + { + // At this point, we can send on either channel and demonstrate the + // problem. The resume point should before both of these messages, but + // after _all_ prior messages. + let final_messages = [ + fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel_b, &sender, &fixtures::now()).await, + ]; + + // Third subscription + let get::Response(events) = get::handler( + State(app.clone()), + subscriber.clone(), + Some(resume_at.into()), + Query::default(), + ) + .await + .expect("subscribe never fails"); + + // Check for expected events + + let events = events + .filter_map(fixtures::message::events) + .filter_map(fixtures::message::sent) + .zip(stream::iter(final_messages)) + .collect::<Vec<_>>() + .immediately() + .await; + + assert!(events + .iter() + .all(|(event, message)| message == &event.message)); + }; +} |
