use axum::extract::State; use axum_extra::extract::Query; use futures::{ future, stream::{self, StreamExt as _}, }; use crate::{ event::routes, test::fixtures::{self, future::Immediately as _}, }; #[tokio::test] async fn includes_historical_message() { // Set up the environment let app = fixtures::scratch_app().await; let sender = fixtures::login::create(&app).await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await; // Call the endpoint let subscriber_creds = fixtures::login::create_with_password(&app).await; let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await; let routes::Events(events) = routes::events(State(app), subscriber, None, Query::default()) .await .expect("subscribe never fails"); // Verify the structure of the response. let event = events .filter(fixtures::filter::messages()) .next() .immediately() .await .expect("delivered stored message"); assert_eq!(message, event); } #[tokio::test] async fn includes_live_message() { // Set up the environment let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; // Call the endpoint let subscriber_creds = fixtures::login::create_with_password(&app).await; let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await; let routes::Events(events) = routes::events(State(app.clone()), subscriber, None, Query::default()) .await .expect("subscribe never fails"); // Verify the semantics let sender = fixtures::login::create(&app).await; let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await; let event = events .filter(fixtures::filter::messages()) .next() .immediately() .await .expect("delivered live message"); assert_eq!(message, event); } #[tokio::test] async fn includes_multiple_channels() { // Set up the environment let app = fixtures::scratch_app().await; let sender = fixtures::login::create(&app).await; let channels = [ fixtures::channel::create(&app, &fixtures::now()).await, fixtures::channel::create(&app, &fixtures::now()).await, ]; let messages = stream::iter(channels) .then(|channel| { let app = app.clone(); let sender = sender.clone(); let channel = channel.clone(); async move { fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await } }) .collect::>() .await; // Call the endpoint let subscriber_creds = fixtures::login::create_with_password(&app).await; let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await; let routes::Events(events) = routes::events(State(app), subscriber, None, Query::default()) .await .expect("subscribe never fails"); // Verify the structure of the response. let events = events .filter(fixtures::filter::messages()) .take(messages.len()) .collect::>() .immediately() .await; for message in &messages { assert!(events.iter().any(|event| { event == message })); } } #[tokio::test] async fn sequential_messages() { // Set up the environment let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; let sender = fixtures::login::create(&app).await; let messages = vec![ fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await, fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await, fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await, ]; // Call the endpoint let subscriber_creds = fixtures::login::create_with_password(&app).await; let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await; let routes::Events(events) = routes::events(State(app), subscriber, None, Query::default()) .await .expect("subscribe never fails"); // Verify the structure of the response. let mut events = events.filter(|event| future::ready(messages.contains(event))); // Verify delivery in order for message in &messages { let event = events .next() .immediately() .await .expect("undelivered messages remaining"); assert_eq!(message, &event); } } #[tokio::test] async fn resumes_from() { // Set up the environment let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; let sender = fixtures::login::create(&app).await; let initial_message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await; let later_messages = vec![ fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await, fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await, ]; // Call the endpoint let subscriber_creds = fixtures::login::create_with_password(&app).await; let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await; let resume_at = { // First subscription let routes::Events(events) = routes::events( State(app.clone()), subscriber.clone(), None, Query::default(), ) .await .expect("subscribe never fails"); let event = events .filter(fixtures::filter::messages()) .next() .immediately() .await .expect("delivered events"); assert_eq!(initial_message, event); event.sequence }; // Resume after disconnect let routes::Events(resumed) = routes::events( State(app), subscriber, Some(resume_at.into()), Query::default(), ) .await .expect("subscribe never fails"); // Verify the structure of the response. let events = resumed .take(later_messages.len()) .collect::>() .immediately() .await; for message in &later_messages { assert!(events.iter().any(|event| event == message)); } } // This test verifies a real bug I hit developing the vector-of-sequences // approach to resuming events. A small omission caused the event IDs in a // resumed stream to _omit_ channels that were in the original stream until // those channels also appeared in the resumed stream. // // Clients would see something like // * In the original stream, Cfoo=5,Cbar=8 // * In the resumed stream, Cfoo=6 (no Cbar sequence number) // // Disconnecting and reconnecting a second time, using event IDs from that // initial period of the first resume attempt, would then cause the second // resume attempt to restart all other channels from the beginning, and not // from where the first disconnection happened. // // This is a real and valid behaviour for clients! #[tokio::test] async fn serial_resume() { // Set up the environment let app = fixtures::scratch_app().await; let sender = fixtures::login::create(&app).await; let channel_a = fixtures::channel::create(&app, &fixtures::now()).await; let channel_b = fixtures::channel::create(&app, &fixtures::now()).await; // Call the endpoint let subscriber_creds = fixtures::login::create_with_password(&app).await; let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await; let resume_at = { let initial_messages = [ fixtures::message::send(&app, &sender, &channel_a, &fixtures::now()).await, fixtures::message::send(&app, &sender, &channel_b, &fixtures::now()).await, ]; // First subscription let routes::Events(events) = routes::events( State(app.clone()), subscriber.clone(), None, Query::default(), ) .await .expect("subscribe never fails"); let events = events .filter(fixtures::filter::messages()) .take(initial_messages.len()) .collect::>() .immediately() .await; for message in &initial_messages { assert!(events.iter().any(|event| event == message)); } let event = events.last().expect("this vec is non-empty"); event.sequence }; // Resume after disconnect let resume_at = { let resume_messages = [ // Note that channel_b does not appear here. The buggy behaviour // would be masked if channel_b happened to send a new message // into the resumed event stream. fixtures::message::send(&app, &sender, &channel_a, &fixtures::now()).await, fixtures::message::send(&app, &sender, &channel_a, &fixtures::now()).await, ]; // Second subscription let routes::Events(events) = routes::events( State(app.clone()), subscriber.clone(), Some(resume_at.into()), Query::default(), ) .await .expect("subscribe never fails"); let events = events .filter(fixtures::filter::messages()) .take(resume_messages.len()) .collect::>() .immediately() .await; for message in &resume_messages { assert!(events.iter().any(|event| event == message)); } let event = events.last().expect("this vec is non-empty"); event.sequence }; // Resume after disconnect a second time { // At this point, we can send on either channel and demonstrate the // problem. The resume point should before both of these messages, but // after _all_ prior messages. let final_messages = [ fixtures::message::send(&app, &sender, &channel_a, &fixtures::now()).await, fixtures::message::send(&app, &sender, &channel_b, &fixtures::now()).await, ]; // Third subscription let routes::Events(events) = routes::events( State(app.clone()), subscriber.clone(), Some(resume_at.into()), Query::default(), ) .await .expect("subscribe never fails"); let events = events .filter(fixtures::filter::messages()) .take(final_messages.len()) .collect::>() .immediately() .await; // This set of messages, in particular, _should not_ include any prior // messages from `initial_messages` or `resume_messages`. for message in &final_messages { assert!(events.iter().any(|event| event == message)); } }; } #[tokio::test] async fn terminates_on_token_expiry() { // Set up the environment let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; let sender = fixtures::login::create(&app).await; // Subscribe via the endpoint let subscriber_creds = fixtures::login::create_with_password(&app).await; let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::ancient()).await; let routes::Events(events) = routes::events(State(app.clone()), subscriber, None, Query::default()) .await .expect("subscribe never fails"); // Verify the resulting stream's behaviour app.logins() .expire(&fixtures::now()) .await .expect("expiring tokens succeeds"); // These should not be delivered. let messages = [ fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await, fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await, fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await, ]; assert!(events .filter(|event| future::ready(messages.contains(event))) .next() .immediately() .await .is_none()); } #[tokio::test] async fn terminates_on_logout() { // Set up the environment let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; let sender = fixtures::login::create(&app).await; // Subscribe via the endpoint let subscriber_creds = fixtures::login::create_with_password(&app).await; let subscriber_token = fixtures::identity::logged_in(&app, &subscriber_creds, &fixtures::now()).await; let subscriber = fixtures::identity::from_token(&app, &subscriber_token, &fixtures::now()).await; let routes::Events(events) = routes::events( State(app.clone()), subscriber.clone(), None, Query::default(), ) .await .expect("subscribe never fails"); // Verify the resulting stream's behaviour app.logins() .logout(&subscriber.token) .await .expect("expiring tokens succeeds"); // These should not be delivered. let messages = [ fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await, fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await, fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await, ]; assert!(events .filter(|event| future::ready(messages.contains(event))) .next() .immediately() .await .is_none()); }