use axum::extract::State; use axum_extra::extract::Query; use futures::{ future, stream::{self, StreamExt as _}, }; use super::get; use crate::{ event::Sequenced as _, test::fixtures::{self, future::Immediately as _}, }; #[tokio::test] async fn includes_historical_message() { // Set up the environment let app = fixtures::scratch_app().await; let sender = fixtures::login::create(&app, &fixtures::now()).await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; // Call the endpoint let subscriber_creds = fixtures::login::create_with_password(&app, &fixtures::now()).await; let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await; let get::Response(events) = get::handler(State(app), subscriber, None, Query::default()) .await .expect("subscribe never fails"); // Verify the structure of the response. let event = events .filter_map(fixtures::message::events) .next() .immediately() .await .expect("delivered stored message"); assert!(fixtures::event::message_sent(&event, &message)); } #[tokio::test] async fn includes_live_message() { // Set up the environment let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; // Call the endpoint let subscriber_creds = fixtures::login::create_with_password(&app, &fixtures::now()).await; let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await; let get::Response(events) = get::handler(State(app.clone()), subscriber, None, Query::default()) .await .expect("subscribe never fails"); // Verify the semantics let sender = fixtures::login::create(&app, &fixtures::now()).await; let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; let event = events .filter_map(fixtures::message::events) .next() .immediately() .await .expect("delivered live message"); assert!(fixtures::event::message_sent(&event, &message)); } #[tokio::test] async fn includes_multiple_channels() { // Set up the environment let app = fixtures::scratch_app().await; let sender = fixtures::login::create(&app, &fixtures::now()).await; let channels = [ fixtures::channel::create(&app, &fixtures::now()).await, fixtures::channel::create(&app, &fixtures::now()).await, ]; let messages = stream::iter(channels) .then(|channel| { let app = app.clone(); let sender = sender.clone(); let channel = channel.clone(); async move { fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await } }) .collect::>() .await; // Call the endpoint let subscriber_creds = fixtures::login::create_with_password(&app, &fixtures::now()).await; let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await; let get::Response(events) = get::handler(State(app), subscriber, None, Query::default()) .await .expect("subscribe never fails"); // Verify the structure of the response. let events = events .filter_map(fixtures::message::events) .take(messages.len()) .collect::>() .immediately() .await; for message in &messages { assert!(events .iter() .any(|event| fixtures::event::message_sent(event, message))); } } #[tokio::test] async fn sequential_messages() { // Set up the environment let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; let sender = fixtures::login::create(&app, &fixtures::now()).await; let messages = vec![ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, ]; // Call the endpoint let subscriber_creds = fixtures::login::create_with_password(&app, &fixtures::now()).await; let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await; let get::Response(events) = get::handler(State(app), subscriber, None, Query::default()) .await .expect("subscribe never fails"); // Verify the structure of the response. let mut events = events .filter_map(fixtures::message::events) .filter(|event| { future::ready( messages .iter() .any(|message| fixtures::event::message_sent(event, message)), ) }); // Verify delivery in order for message in &messages { let event = events .next() .immediately() .await .expect("undelivered messages remaining"); assert!(fixtures::event::message_sent(&event, message)); } } #[tokio::test] async fn resumes_from() { // Set up the environment let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; let sender = fixtures::login::create(&app, &fixtures::now()).await; let initial_message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; let later_messages = vec![ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, ]; // Call the endpoint let subscriber_creds = fixtures::login::create_with_password(&app, &fixtures::now()).await; let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await; let resume_at = { // First subscription let get::Response(events) = get::handler( State(app.clone()), subscriber.clone(), None, Query::default(), ) .await .expect("subscribe never fails"); let event = events .filter_map(fixtures::message::events) .next() .immediately() .await .expect("delivered events"); assert!(fixtures::event::message_sent(&event, &initial_message)); event.sequence() }; // Resume after disconnect let get::Response(resumed) = get::handler( State(app), subscriber, Some(resume_at.into()), Query::default(), ) .await .expect("subscribe never fails"); // Verify the structure of the response. let events = resumed .filter_map(fixtures::message::events) .take(later_messages.len()) .collect::>() .immediately() .await; for message in &later_messages { assert!(events .iter() .any(|event| fixtures::event::message_sent(event, message))); } } // This test verifies a real bug I hit developing the vector-of-sequences // approach to resuming events. A small omission caused the event IDs in a // resumed stream to _omit_ channels that were in the original stream until // those channels also appeared in the resumed stream. // // Clients would see something like // * In the original stream, Cfoo=5,Cbar=8 // * In the resumed stream, Cfoo=6 (no Cbar sequence number) // // Disconnecting and reconnecting a second time, using event IDs from that // initial period of the first resume attempt, would then cause the second // resume attempt to restart all other channels from the beginning, and not // from where the first disconnection happened. // // This is a real and valid behaviour for clients! #[tokio::test] async fn serial_resume() { // Set up the environment let app = fixtures::scratch_app().await; let sender = fixtures::login::create(&app, &fixtures::now()).await; let channel_a = fixtures::channel::create(&app, &fixtures::now()).await; let channel_b = fixtures::channel::create(&app, &fixtures::now()).await; // Call the endpoint let subscriber_creds = fixtures::login::create_with_password(&app, &fixtures::now()).await; let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await; let resume_at = { let initial_messages = [ fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await, fixtures::message::send(&app, &channel_b, &sender, &fixtures::now()).await, ]; // First subscription let get::Response(events) = get::handler( State(app.clone()), subscriber.clone(), None, Query::default(), ) .await .expect("subscribe never fails"); let events = events .filter_map(fixtures::message::events) .take(initial_messages.len()) .collect::>() .immediately() .await; for message in &initial_messages { assert!(events .iter() .any(|event| fixtures::event::message_sent(event, message))); } let event = events.last().expect("this vec is non-empty"); event.sequence() }; // Resume after disconnect let resume_at = { let resume_messages = [ // Note that channel_b does not appear here. The buggy behaviour // would be masked if channel_b happened to send a new message // into the resumed event stream. fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await, fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await, ]; // Second subscription let get::Response(events) = get::handler( State(app.clone()), subscriber.clone(), Some(resume_at.into()), Query::default(), ) .await .expect("subscribe never fails"); let events = events .filter_map(fixtures::message::events) .take(resume_messages.len()) .collect::>() .immediately() .await; for message in &resume_messages { assert!(events .iter() .any(|event| fixtures::event::message_sent(event, message))); } let event = events.last().expect("this vec is non-empty"); event.sequence() }; // Resume after disconnect a second time { // At this point, we can send on either channel and demonstrate the // problem. The resume point should before both of these messages, but // after _all_ prior messages. let final_messages = [ fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await, fixtures::message::send(&app, &channel_b, &sender, &fixtures::now()).await, ]; // Third subscription let get::Response(events) = get::handler( State(app.clone()), subscriber.clone(), Some(resume_at.into()), Query::default(), ) .await .expect("subscribe never fails"); let events = events .filter_map(fixtures::message::events) .take(final_messages.len()) .collect::>() .immediately() .await; // This set of messages, in particular, _should not_ include any prior // messages from `initial_messages` or `resume_messages`. for message in &final_messages { assert!(events .iter() .any(|event| fixtures::event::message_sent(event, message))); } }; } #[tokio::test] async fn terminates_on_token_expiry() { // Set up the environment let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; let sender = fixtures::login::create(&app, &fixtures::now()).await; // Subscribe via the endpoint let subscriber_creds = fixtures::login::create_with_password(&app, &fixtures::now()).await; let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::ancient()).await; let get::Response(events) = get::handler(State(app.clone()), subscriber, None, Query::default()) .await .expect("subscribe never fails"); // Verify the resulting stream's behaviour app.tokens() .expire(&fixtures::now()) .await .expect("expiring tokens succeeds"); // These should not be delivered. let messages = [ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, ]; assert!(events .filter_map(fixtures::message::events) .filter(|event| future::ready( messages .iter() .any(|message| fixtures::event::message_sent(event, message)) )) .next() .immediately() .await .is_none()); } #[tokio::test] async fn terminates_on_logout() { // Set up the environment let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; let sender = fixtures::login::create(&app, &fixtures::now()).await; // Subscribe via the endpoint let subscriber_creds = fixtures::login::create_with_password(&app, &fixtures::now()).await; let subscriber_token = fixtures::identity::logged_in(&app, &subscriber_creds, &fixtures::now()).await; let subscriber = fixtures::identity::from_token(&app, &subscriber_token, &fixtures::now()).await; let get::Response(events) = get::handler( State(app.clone()), subscriber.clone(), None, Query::default(), ) .await .expect("subscribe never fails"); // Verify the resulting stream's behaviour app.tokens() .logout(&subscriber.token) .await .expect("expiring tokens succeeds"); // These should not be delivered. let messages = [ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, ]; assert!(events .filter_map(fixtures::message::events) .filter(|event| future::ready( messages .iter() .any(|message| fixtures::event::message_sent(event, message)) )) .next() .immediately() .await .is_none()); }