From 4e3d5ccac99b24934c972e088cd7eb02bb95df06 Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Tue, 17 Jun 2025 02:11:45 -0400 Subject: Handlers are _named operations_, which can be exposed via routes. Each domain module that exposes handlers does so through a `handlers` child module, ideally as a top-level symbol that can be plugged directly into Axum's `MethodRouter`. Modules could make exceptions to this - kill the doctrinaire inside yourself, after all - but none of the API modules that actually exist need such exceptions, and consistency is useful. The related details of request types, URL types, response types, errors, &c &c are then organized into modules under `handlers`, along with their respective tests. --- src/event/routes/test/resume.rs | 227 ---------------------------------------- 1 file changed, 227 deletions(-) delete mode 100644 src/event/routes/test/resume.rs (limited to 'src/event/routes/test/resume.rs') diff --git a/src/event/routes/test/resume.rs b/src/event/routes/test/resume.rs deleted file mode 100644 index 633eae3..0000000 --- a/src/event/routes/test/resume.rs +++ /dev/null @@ -1,227 +0,0 @@ -use std::future; - -use axum::extract::State; -use axum_extra::extract::Query; -use futures::stream::{self, StreamExt as _}; - -use crate::{ - event::{Sequenced as _, routes::get}, - test::fixtures::{self, future::Expect as _}, -}; - -#[tokio::test] -async fn resume() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; - let sender = fixtures::user::create(&app, &fixtures::now()).await; - let resume_point = fixtures::boot::resume_point(&app).await; - - let initial_message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; - - let later_messages = vec![ - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, - ]; - - // Call the endpoint - - let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - - let resume_at = { - // First subscription - let get::Response(events) = get::handler( - State(app.clone()), - subscriber.clone(), - None, - Query(get::QueryParams { resume_point }), - ) - .await - .expect("subscribe never fails"); - - let event = events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::sent) - .filter(|event| future::ready(event.message == initial_message)) - .next() - .expect_some("delivered event for initial message") - .await; - - event.sequence() - }; - - // Resume after disconnect - let get::Response(resumed) = get::handler( - State(app), - subscriber, - Some(resume_at.into()), - Query(get::QueryParams { resume_point }), - ) - .await - .expect("subscribe never fails"); - - // Verify final events - - let mut events = resumed - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::sent) - .zip(stream::iter(later_messages)); - - while let Some((event, message)) = events.next().expect_ready("event ready").await { - assert_eq!(message, event.message); - } -} - -// This test verifies a real bug I hit developing the vector-of-sequences -// approach to resuming events. A small omission caused the event IDs in a -// resumed stream to _omit_ channels that were in the original stream until -// those channels also appeared in the resumed stream. -// -// Clients would see something like -// * In the original stream, Cfoo=5,Cbar=8 -// * In the resumed stream, Cfoo=6 (no Cbar sequence number) -// -// Disconnecting and reconnecting a second time, using event IDs from that -// initial period of the first resume attempt, would then cause the second -// resume attempt to restart all other channels from the beginning, and not -// from where the first disconnection happened. -// -// As we have switched to a single global event sequence number, this scenario -// can no longer arise, but this test is preserved because the actual behaviour -// _is_ a valid way for clients to behave, and should work. We might as well -// keep testing it. -#[tokio::test] -async fn serial_resume() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let sender = fixtures::user::create(&app, &fixtures::now()).await; - let channel_a = fixtures::channel::create(&app, &fixtures::now()).await; - let channel_b = fixtures::channel::create(&app, &fixtures::now()).await; - let resume_point = fixtures::boot::resume_point(&app).await; - - // Call the endpoint - - let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - - let resume_at = { - let initial_messages = [ - fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await, - fixtures::message::send(&app, &channel_b, &sender, &fixtures::now()).await, - ]; - - // First subscription - - let get::Response(events) = get::handler( - State(app.clone()), - subscriber.clone(), - None, - Query(get::QueryParams { resume_point }), - ) - .await - .expect("subscribe never fails"); - - // Check for expected events - - let events = events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::sent) - .zip(stream::iter(initial_messages)) - .collect::>() - .expect_ready("zipping a finite list of events is ready immediately") - .await; - - assert!( - events - .iter() - .all(|(event, message)| message == &event.message) - ); - - let (event, _) = events.last().expect("this vec is non-empty"); - - // Take the last one's resume point - - event.sequence() - }; - - // Resume after disconnect - let resume_at = { - let resume_messages = [ - // Note that channel_b does not appear here. The buggy behaviour - // would be masked if channel_b happened to send a new message - // into the resumed event stream. - fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await, - fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await, - ]; - - // Second subscription - let get::Response(events) = get::handler( - State(app.clone()), - subscriber.clone(), - Some(resume_at.into()), - Query(get::QueryParams { resume_point }), - ) - .await - .expect("subscribe never fails"); - - // Check for expected events - - let events = events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::sent) - .zip(stream::iter(resume_messages)) - .collect::>() - .expect_ready("zipping a finite list of events is ready immediately") - .await; - - assert!( - events - .iter() - .all(|(event, message)| message == &event.message) - ); - - let (event, _) = events.last().expect("this vec is non-empty"); - - // Take the last one's resume point - - event.sequence() - }; - - // Resume after disconnect a second time - { - // At this point, we can send on either channel and demonstrate the - // problem. The resume point should before both of these messages, but - // after _all_ prior messages. - let final_messages = [ - fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await, - fixtures::message::send(&app, &channel_b, &sender, &fixtures::now()).await, - ]; - - // Third subscription - let get::Response(events) = get::handler( - State(app.clone()), - subscriber.clone(), - Some(resume_at.into()), - Query(get::QueryParams { resume_point }), - ) - .await - .expect("subscribe never fails"); - - // Check for expected events - - let events = events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::sent) - .zip(stream::iter(final_messages)) - .collect::>() - .expect_ready("zipping a finite list of events is ready immediately") - .await; - - assert!( - events - .iter() - .all(|(event, message)| message == &event.message) - ); - }; -} -- cgit v1.2.3