summaryrefslogtreecommitdiff
path: root/src/event/routes/test/resume.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/event/routes/test/resume.rs')
-rw-r--r--src/event/routes/test/resume.rs220
1 files changed, 220 insertions, 0 deletions
diff --git a/src/event/routes/test/resume.rs b/src/event/routes/test/resume.rs
new file mode 100644
index 0000000..c393d38
--- /dev/null
+++ b/src/event/routes/test/resume.rs
@@ -0,0 +1,220 @@
+use std::future;
+
+use axum::extract::State;
+use axum_extra::extract::Query;
+use futures::stream::{self, StreamExt as _};
+
+use crate::{
+ event::{routes::get, Sequenced as _},
+ test::fixtures::{self, future::Immediately as _},
+};
+
+#[tokio::test]
+async fn resume() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+ let sender = fixtures::login::create(&app, &fixtures::now()).await;
+
+ let initial_message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await;
+
+ let later_messages = vec![
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ ];
+
+ // Call the endpoint
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+
+ let resume_at = {
+ // First subscription
+ let get::Response(events) = get::handler(
+ State(app.clone()),
+ subscriber.clone(),
+ None,
+ Query::default(),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ let event = events
+ .filter_map(fixtures::message::events)
+ .filter_map(fixtures::message::sent)
+ .filter(|event| future::ready(event.message == initial_message))
+ .next()
+ .immediately()
+ .await
+ .expect("delivered events");
+
+ event.sequence()
+ };
+
+ // Resume after disconnect
+ let get::Response(resumed) = get::handler(
+ State(app),
+ subscriber,
+ Some(resume_at.into()),
+ Query::default(),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ // Verify final events
+
+ let mut events = resumed
+ .filter_map(fixtures::message::events)
+ .filter_map(fixtures::message::sent)
+ .zip(stream::iter(later_messages));
+
+ while let Some((event, message)) = events.next().immediately().await {
+ assert_eq!(message, event.message);
+ }
+}
+
+// This test verifies a real bug I hit developing the vector-of-sequences
+// approach to resuming events. A small omission caused the event IDs in a
+// resumed stream to _omit_ channels that were in the original stream until
+// those channels also appeared in the resumed stream.
+//
+// Clients would see something like
+// * In the original stream, Cfoo=5,Cbar=8
+// * In the resumed stream, Cfoo=6 (no Cbar sequence number)
+//
+// Disconnecting and reconnecting a second time, using event IDs from that
+// initial period of the first resume attempt, would then cause the second
+// resume attempt to restart all other channels from the beginning, and not
+// from where the first disconnection happened.
+//
+// As we have switched to a single global event sequence number, this scenario
+// can no longer arise, but this test is preserved because the actual behaviour
+// _is_ a valid way for clients to behave, and should work. We might as well
+// keep testing it.
+#[tokio::test]
+async fn serial_resume() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let sender = fixtures::login::create(&app, &fixtures::now()).await;
+ let channel_a = fixtures::channel::create(&app, &fixtures::now()).await;
+ let channel_b = fixtures::channel::create(&app, &fixtures::now()).await;
+
+ // Call the endpoint
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+
+ let resume_at = {
+ let initial_messages = [
+ fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel_b, &sender, &fixtures::now()).await,
+ ];
+
+ // First subscription
+
+ let get::Response(events) = get::handler(
+ State(app.clone()),
+ subscriber.clone(),
+ None,
+ Query::default(),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ // Check for expected events
+
+ let events = events
+ .filter_map(fixtures::message::events)
+ .filter_map(fixtures::message::sent)
+ .zip(stream::iter(initial_messages))
+ .collect::<Vec<_>>()
+ .immediately()
+ .await;
+
+ assert!(events
+ .iter()
+ .all(|(event, message)| message == &event.message));
+
+ let (event, _) = events.last().expect("this vec is non-empty");
+
+ // Take the last one's resume point
+
+ event.sequence()
+ };
+
+ // Resume after disconnect
+ let resume_at = {
+ let resume_messages = [
+ // Note that channel_b does not appear here. The buggy behaviour
+ // would be masked if channel_b happened to send a new message
+ // into the resumed event stream.
+ fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await,
+ ];
+
+ // Second subscription
+ let get::Response(events) = get::handler(
+ State(app.clone()),
+ subscriber.clone(),
+ Some(resume_at.into()),
+ Query::default(),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ // Check for expected events
+
+ let events = events
+ .filter_map(fixtures::message::events)
+ .filter_map(fixtures::message::sent)
+ .zip(stream::iter(resume_messages))
+ .collect::<Vec<_>>()
+ .immediately()
+ .await;
+
+ assert!(events
+ .iter()
+ .all(|(event, message)| message == &event.message));
+
+ let (event, _) = events.last().expect("this vec is non-empty");
+
+ // Take the last one's resume point
+
+ event.sequence()
+ };
+
+ // Resume after disconnect a second time
+ {
+ // At this point, we can send on either channel and demonstrate the
+ // problem. The resume point should before both of these messages, but
+ // after _all_ prior messages.
+ let final_messages = [
+ fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel_b, &sender, &fixtures::now()).await,
+ ];
+
+ // Third subscription
+ let get::Response(events) = get::handler(
+ State(app.clone()),
+ subscriber.clone(),
+ Some(resume_at.into()),
+ Query::default(),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ // Check for expected events
+
+ let events = events
+ .filter_map(fixtures::message::events)
+ .filter_map(fixtures::message::sent)
+ .zip(stream::iter(final_messages))
+ .collect::<Vec<_>>()
+ .immediately()
+ .await;
+
+ assert!(events
+ .iter()
+ .all(|(event, message)| message == &event.message));
+ };
+}