From 4e3d5ccac99b24934c972e088cd7eb02bb95df06 Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Tue, 17 Jun 2025 02:11:45 -0400 Subject: Handlers are _named operations_, which can be exposed via routes. Each domain module that exposes handlers does so through a `handlers` child module, ideally as a top-level symbol that can be plugged directly into Axum's `MethodRouter`. Modules could make exceptions to this - kill the doctrinaire inside yourself, after all - but none of the API modules that actually exist need such exceptions, and consistency is useful. The related details of request types, URL types, response types, errors, &c &c are then organized into modules under `handlers`, along with their respective tests. --- src/event/handlers/stream/test/resume.rs | 227 +++++++++++++++++++++++++++++++ 1 file changed, 227 insertions(+) create mode 100644 src/event/handlers/stream/test/resume.rs (limited to 'src/event/handlers/stream/test/resume.rs') diff --git a/src/event/handlers/stream/test/resume.rs b/src/event/handlers/stream/test/resume.rs new file mode 100644 index 0000000..34fee4d --- /dev/null +++ b/src/event/handlers/stream/test/resume.rs @@ -0,0 +1,227 @@ +use std::future; + +use axum::extract::State; +use axum_extra::extract::Query; +use futures::stream::{self, StreamExt as _}; + +use crate::{ + event::Sequenced as _, + test::fixtures::{self, future::Expect as _}, +}; + +#[tokio::test] +async fn resume() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let sender = fixtures::user::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; + + let initial_message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; + + let later_messages = vec![ + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + ]; + + // Call the endpoint + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + + let resume_at = { + // First subscription + let super::Response(events) = super::handler( + State(app.clone()), + subscriber.clone(), + None, + Query(super::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); + + let event = events + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::sent) + .filter(|event| future::ready(event.message == initial_message)) + .next() + .expect_some("delivered event for initial message") + .await; + + event.sequence() + }; + + // Resume after disconnect + let super::Response(resumed) = super::handler( + State(app), + subscriber, + Some(resume_at.into()), + Query(super::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); + + // Verify final events + + let mut events = resumed + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::sent) + .zip(stream::iter(later_messages)); + + while let Some((event, message)) = events.next().expect_ready("event ready").await { + assert_eq!(message, event.message); + } +} + +// This test verifies a real bug I hit developing the vector-of-sequences +// approach to resuming events. A small omission caused the event IDs in a +// resumed stream to _omit_ channels that were in the original stream until +// those channels also appeared in the resumed stream. +// +// Clients would see something like +// * In the original stream, Cfoo=5,Cbar=8 +// * In the resumed stream, Cfoo=6 (no Cbar sequence number) +// +// Disconnecting and reconnecting a second time, using event IDs from that +// initial period of the first resume attempt, would then cause the second +// resume attempt to restart all other channels from the beginning, and not +// from where the first disconnection happened. +// +// As we have switched to a single global event sequence number, this scenario +// can no longer arise, but this test is preserved because the actual behaviour +// _is_ a valid way for clients to behave, and should work. We might as well +// keep testing it. +#[tokio::test] +async fn serial_resume() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let sender = fixtures::user::create(&app, &fixtures::now()).await; + let channel_a = fixtures::channel::create(&app, &fixtures::now()).await; + let channel_b = fixtures::channel::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; + + // Call the endpoint + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + + let resume_at = { + let initial_messages = [ + fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel_b, &sender, &fixtures::now()).await, + ]; + + // First subscription + + let super::Response(events) = super::handler( + State(app.clone()), + subscriber.clone(), + None, + Query(super::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); + + // Check for expected events + + let events = events + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::sent) + .zip(stream::iter(initial_messages)) + .collect::>() + .expect_ready("zipping a finite list of events is ready immediately") + .await; + + assert!( + events + .iter() + .all(|(event, message)| message == &event.message) + ); + + let (event, _) = events.last().expect("this vec is non-empty"); + + // Take the last one's resume point + + event.sequence() + }; + + // Resume after disconnect + let resume_at = { + let resume_messages = [ + // Note that channel_b does not appear here. The buggy behaviour + // would be masked if channel_b happened to send a new message + // into the resumed event stream. + fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await, + ]; + + // Second subscription + let super::Response(events) = super::handler( + State(app.clone()), + subscriber.clone(), + Some(resume_at.into()), + Query(super::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); + + // Check for expected events + + let events = events + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::sent) + .zip(stream::iter(resume_messages)) + .collect::>() + .expect_ready("zipping a finite list of events is ready immediately") + .await; + + assert!( + events + .iter() + .all(|(event, message)| message == &event.message) + ); + + let (event, _) = events.last().expect("this vec is non-empty"); + + // Take the last one's resume point + + event.sequence() + }; + + // Resume after disconnect a second time + { + // At this point, we can send on either channel and demonstrate the + // problem. The resume point should before both of these messages, but + // after _all_ prior messages. + let final_messages = [ + fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel_b, &sender, &fixtures::now()).await, + ]; + + // Third subscription + let super::Response(events) = super::handler( + State(app.clone()), + subscriber.clone(), + Some(resume_at.into()), + Query(super::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); + + // Check for expected events + + let events = events + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::sent) + .zip(stream::iter(final_messages)) + .collect::>() + .expect_ready("zipping a finite list of events is ready immediately") + .await; + + assert!( + events + .iter() + .all(|(event, message)| message == &event.message) + ); + }; +} -- cgit v1.2.3