use std::future; use axum::extract::State; use axum_extra::extract::Query; use futures::stream::{self, StreamExt as _}; use crate::{ event::Sequenced as _, test::fixtures::{self, future::Expect as _}, }; #[tokio::test] async fn resume() { // Set up the environment let app = fixtures::scratch_app().await; let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; let sender = fixtures::user::create(&app, &fixtures::now()).await; let resume_point = fixtures::boot::resume_point(&app).await; let initial_message = fixtures::message::send(&app, &conversation, &sender, &fixtures::now()).await; let later_messages = vec![ fixtures::message::send(&app, &conversation, &sender, &fixtures::now()).await, fixtures::message::send(&app, &conversation, &sender, &fixtures::now()).await, ]; // Call the endpoint let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; let resume_at = { // First subscription let super::Response(events) = super::handler( State(app.clone()), subscriber.clone(), None, Query(super::QueryParams { resume_point }), ) .await .expect("subscribe never fails"); let event = events .filter_map(fixtures::event::stream::message) .filter_map(fixtures::event::stream::message::sent) .filter(|event| future::ready(event.message == initial_message)) .next() .expect_some("delivered event for initial message") .await; event.sequence() }; // Resume after disconnect let super::Response(resumed) = super::handler( State(app), subscriber, Some(resume_at.into()), Query(super::QueryParams { resume_point }), ) .await .expect("subscribe never fails"); // Verify final events let mut events = resumed .filter_map(fixtures::event::stream::message) .filter_map(fixtures::event::stream::message::sent) .zip(stream::iter(later_messages)); while let Some((event, message)) = events.next().expect_ready("event ready").await { assert_eq!(message, event.message); } } // This test verifies a real bug I hit developing the vector-of-sequences // approach to resuming events. A small omission caused the event IDs in a // resumed stream to _omit_ conversations that were in the original stream // until those conversations also appeared in the resumed stream. // // Clients would see something like // * In the original stream, Cfoo=5,Cbar=8 // * In the resumed stream, Cfoo=6 (no Cbar sequence number) // // Disconnecting and reconnecting a second time, using event IDs from that // initial period of the first resume attempt, would then cause the second // resume attempt to restart all other conversations from the beginning, and // not from where the first disconnection happened. // // As we have switched to a single global event sequence number, this scenario // can no longer arise, but this test is preserved because the actual behaviour // _is_ a valid way for clients to behave, and should work. We might as well // keep testing it. #[tokio::test] async fn serial_resume() { // Set up the environment let app = fixtures::scratch_app().await; let sender = fixtures::user::create(&app, &fixtures::now()).await; let conversation_a = fixtures::conversation::create(&app, &fixtures::now()).await; let conversation_b = fixtures::conversation::create(&app, &fixtures::now()).await; let resume_point = fixtures::boot::resume_point(&app).await; // Call the endpoint let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; let resume_at = { let initial_messages = [ fixtures::message::send(&app, &conversation_a, &sender, &fixtures::now()).await, fixtures::message::send(&app, &conversation_b, &sender, &fixtures::now()).await, ]; // First subscription let super::Response(events) = super::handler( State(app.clone()), subscriber.clone(), None, Query(super::QueryParams { resume_point }), ) .await .expect("subscribe never fails"); // Check for expected events let events = events .filter_map(fixtures::event::stream::message) .filter_map(fixtures::event::stream::message::sent) .zip(stream::iter(initial_messages)) .collect::>() .expect_ready("zipping a finite list of events is ready immediately") .await; assert!( events .iter() .all(|(event, message)| message == &event.message) ); let (event, _) = events.last().expect("this vec is non-empty"); // Take the last one's resume point event.sequence() }; // Resume after disconnect let resume_at = { let resume_messages = [ // Note that conversation_b does not appear here. The buggy behaviour // would be masked if conversation_b happened to send a new message // into the resumed event stream. fixtures::message::send(&app, &conversation_a, &sender, &fixtures::now()).await, fixtures::message::send(&app, &conversation_a, &sender, &fixtures::now()).await, ]; // Second subscription let super::Response(events) = super::handler( State(app.clone()), subscriber.clone(), Some(resume_at.into()), Query(super::QueryParams { resume_point }), ) .await .expect("subscribe never fails"); // Check for expected events let events = events .filter_map(fixtures::event::stream::message) .filter_map(fixtures::event::stream::message::sent) .zip(stream::iter(resume_messages)) .collect::>() .expect_ready("zipping a finite list of events is ready immediately") .await; assert!( events .iter() .all(|(event, message)| message == &event.message) ); let (event, _) = events.last().expect("this vec is non-empty"); // Take the last one's resume point event.sequence() }; // Resume after disconnect a second time { // At this point, we can send on either conversation and demonstrate // the problem. The resume point should before both of these messages, // but after _all_ prior messages. let final_messages = [ fixtures::message::send(&app, &conversation_a, &sender, &fixtures::now()).await, fixtures::message::send(&app, &conversation_b, &sender, &fixtures::now()).await, ]; // Third subscription let super::Response(events) = super::handler( State(app.clone()), subscriber.clone(), Some(resume_at.into()), Query(super::QueryParams { resume_point }), ) .await .expect("subscribe never fails"); // Check for expected events let events = events .filter_map(fixtures::event::stream::message) .filter_map(fixtures::event::stream::message::sent) .zip(stream::iter(final_messages)) .collect::>() .expect_ready("zipping a finite list of events is ready immediately") .await; assert!( events .iter() .all(|(event, message)| message == &event.message) ); }; }