use axum::extract::State; use axum_extra::extract::Query; use futures::{ future, stream::{self, StreamExt as _}, }; use crate::{ events::{app, routes}, repo::channel::{self}, test::fixtures::{self, future::Immediately as _}, }; #[tokio::test] async fn no_subscriptions() { // Set up the environment let app = fixtures::scratch_app().await; let subscriber = fixtures::login::create(&app).await; // Call the endpoint let subscribed_at = fixtures::now(); let query = routes::EventsQuery { channels: [].into(), }; let routes::Events(mut events) = routes::events(State(app), subscribed_at, subscriber, None, Query(query)) .await .expect("empty subscription"); // Verify the structure of the response. assert!(events.next().immediately().await.is_none()); } #[tokio::test] async fn includes_historical_message() { // Set up the environment let app = fixtures::scratch_app().await; let sender = fixtures::login::create(&app).await; let channel = fixtures::channel::create(&app).await; let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await; // Call the endpoint let subscriber = fixtures::login::create(&app).await; let subscribed_at = fixtures::now(); let query = routes::EventsQuery { channels: [channel.id.clone()].into(), }; let routes::Events(mut events) = routes::events(State(app), subscribed_at, subscriber, None, Query(query)) .await .expect("subscribed to valid channel"); // Verify the structure of the response. let routes::ReplayableEvent(_, event) = events .next() .immediately() .await .expect("delivered stored message"); assert_eq!(channel.id, event.channel); assert_eq!(message, event.message); } #[tokio::test] async fn includes_live_message() { // Set up the environment let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app).await; // Call the endpoint let subscriber = fixtures::login::create(&app).await; let subscribed_at = fixtures::now(); let query = routes::EventsQuery { channels: [channel.id.clone()].into(), }; let routes::Events(mut events) = routes::events( State(app.clone()), subscribed_at, subscriber, None, Query(query), ) .await .expect("subscribed to a valid channel"); // Verify the semantics let sender = fixtures::login::create(&app).await; let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await; let routes::ReplayableEvent(_, event) = events .next() .immediately() .await .expect("delivered live message"); assert_eq!(channel.id, event.channel); assert_eq!(message, event.message); } #[tokio::test] async fn excludes_other_channels() { // Set up the environment let app = fixtures::scratch_app().await; let subscribed_channel = fixtures::channel::create(&app).await; let unsubscribed_channel = fixtures::channel::create(&app).await; let sender = fixtures::login::create(&app).await; let message = fixtures::message::send(&app, &sender, &subscribed_channel, &fixtures::now()).await; fixtures::message::send(&app, &sender, &unsubscribed_channel, &fixtures::now()).await; // Call the endpoint let subscriber = fixtures::login::create(&app).await; let subscribed_at = fixtures::now(); let query = routes::EventsQuery { channels: [subscribed_channel.id.clone()].into(), }; let routes::Events(mut events) = routes::events(State(app), subscribed_at, subscriber, None, Query(query)) .await .expect("subscribed to a valid channel"); // Verify the semantics let routes::ReplayableEvent(_, event) = events .next() .immediately() .await .expect("delivered at least one message"); assert_eq!(subscribed_channel.id, event.channel); assert_eq!(message, event.message); } #[tokio::test] async fn includes_multiple_channels() { // Set up the environment let app = fixtures::scratch_app().await; let sender = fixtures::login::create(&app).await; let channels = [ fixtures::channel::create(&app).await, fixtures::channel::create(&app).await, ]; let messages = stream::iter(channels) .then(|channel| async { let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await; (channel, message) }) .collect::>() .await; // Call the endpoint let subscriber = fixtures::login::create(&app).await; let subscribed_at = fixtures::now(); let query = routes::EventsQuery { channels: messages .iter() .map(|(channel, _)| &channel.id) .cloned() .collect(), }; let routes::Events(events) = routes::events(State(app), subscribed_at, subscriber, None, Query(query)) .await .expect("subscribed to valid channels"); // Verify the structure of the response. let events = events .take(messages.len()) .collect::>() .immediately() .await; for (channel, message) in messages { assert!(events.iter().any(|routes::ReplayableEvent(_, event)| { event.channel == channel.id && event.message == message })); } } #[tokio::test] async fn nonexistent_channel() { // Set up the environment let app = fixtures::scratch_app().await; let channel = channel::Id::generate(); // Call the endpoint let subscriber = fixtures::login::create(&app).await; let subscribed_at = fixtures::now(); let query = routes::EventsQuery { channels: [channel.clone()].into(), }; let routes::ErrorResponse(error) = routes::events(State(app), subscribed_at, subscriber, None, Query(query)) .await .expect_err("subscribed to nonexistent channel"); // Verify the structure of the response. fixtures::error::expected!( error, app::EventsError::ChannelNotFound(error_channel), assert_eq!(channel, error_channel) ); } #[tokio::test] async fn sequential_messages() { // Set up the environment let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app).await; let sender = fixtures::login::create(&app).await; let messages = vec![ fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await, fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await, fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await, ]; // Call the endpoint let subscriber = fixtures::login::create(&app).await; let subscribed_at = fixtures::now(); let query = routes::EventsQuery { channels: [channel.id.clone()].into(), }; let routes::Events(events) = routes::events(State(app), subscribed_at, subscriber, None, Query(query)) .await .expect("subscribed to a valid channel"); // Verify the structure of the response. let mut events = events.filter(|routes::ReplayableEvent(_, event)| { future::ready(messages.contains(&event.message)) }); // Verify delivery in order for message in &messages { let routes::ReplayableEvent(_, event) = events .next() .immediately() .await .expect("undelivered messages remaining"); assert_eq!(channel.id, event.channel); assert_eq!(message, &event.message); } } #[tokio::test] async fn resumes_from() { // Set up the environment let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app).await; let sender = fixtures::login::create(&app).await; let initial_message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await; let later_messages = vec![ fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await, fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await, ]; // Call the endpoint let subscriber = fixtures::login::create(&app).await; let subscribed_at = fixtures::now(); let query = routes::EventsQuery { channels: [channel.id.clone()].into(), }; let resume_at = { // First subscription let routes::Events(mut events) = routes::events( State(app.clone()), subscribed_at, subscriber.clone(), None, Query(query.clone()), ) .await .expect("subscribed to a valid channel"); let routes::ReplayableEvent(id, event) = events.next().immediately().await.expect("delivered events"); assert_eq!(channel.id, event.channel); assert_eq!(initial_message, event.message); id }; // Resume after disconnect let reconnect_at = fixtures::now(); let routes::Events(resumed) = routes::events( State(app), reconnect_at, subscriber, Some(resume_at.into()), Query(query), ) .await .expect("subscribed to a valid channel"); // Verify the structure of the response. let events = resumed .take(later_messages.len()) .collect::>() .immediately() .await; for message in later_messages { assert!(events.iter().any( |routes::ReplayableEvent(_, event)| event.channel == channel.id && event.message == message )); } } // This test verifies a real bug I hit developing the vector-of-sequences // approach to resuming events. A small omission caused the event IDs in a // resumed stream to _omit_ channels that were in the original stream until // those channels also appeared in the resumed stream. // // Clients would see something like // * In the original stream, Cfoo=5,Cbar=8 // * In the resumed stream, Cfoo=6 (no Cbar sequence number) // // Disconnecting and reconnecting a second time, using event IDs from that // initial period of the first resume attempt, would then cause the second // resume attempt to restart all other channels from the beginning, and not // from where the first disconnection happened. // // This is a real and valid behaviour for clients! #[tokio::test] async fn serial_resume() { // Set up the environment let app = fixtures::scratch_app().await; let sender = fixtures::login::create(&app).await; let channel_a = fixtures::channel::create(&app).await; let channel_b = fixtures::channel::create(&app).await; // Call the endpoint let subscriber = fixtures::login::create(&app).await; let query = routes::EventsQuery { channels: [channel_a.id.clone(), channel_b.id.clone()].into(), }; let resume_at = { let initial_messages = [ fixtures::message::send(&app, &sender, &channel_a, &fixtures::now()).await, fixtures::message::send(&app, &sender, &channel_b, &fixtures::now()).await, ]; // First subscription let subscribed_at = fixtures::now(); let routes::Events(events) = routes::events( State(app.clone()), subscribed_at, subscriber.clone(), None, Query(query.clone()), ) .await .expect("subscribed to a valid channel"); let events = events .take(initial_messages.len()) .collect::>() .immediately() .await; for message in initial_messages { assert!(events .iter() .any(|routes::ReplayableEvent(_, event)| event.message == message)); } let routes::ReplayableEvent(id, _) = events.last().expect("this vec is non-empty"); id.to_owned() }; // Resume after disconnect let resume_at = { let resume_messages = [ // Note that channel_b does not appear here. The buggy behaviour // would be masked if channel_b happened to send a new message // into the resumed event stream. fixtures::message::send(&app, &sender, &channel_a, &fixtures::now()).await, fixtures::message::send(&app, &sender, &channel_a, &fixtures::now()).await, ]; // Second subscription let resubscribed_at = fixtures::now(); let routes::Events(events) = routes::events( State(app.clone()), resubscribed_at, subscriber.clone(), Some(resume_at.into()), Query(query.clone()), ) .await .expect("subscribed to a valid channel"); let events = events .take(resume_messages.len()) .collect::>() .immediately() .await; for message in resume_messages { assert!(events .iter() .any(|routes::ReplayableEvent(_, event)| event.message == message)); } let routes::ReplayableEvent(id, _) = events.last().expect("this vec is non-empty"); id.to_owned() }; // Resume after disconnect a second time { // At this point, we can send on either channel and demonstrate the // problem. The resume point should before both of these messages, but // after _all_ prior messages. let final_messages = [ fixtures::message::send(&app, &sender, &channel_a, &fixtures::now()).await, fixtures::message::send(&app, &sender, &channel_b, &fixtures::now()).await, ]; // Second subscription let resubscribed_at = fixtures::now(); let routes::Events(events) = routes::events( State(app.clone()), resubscribed_at, subscriber.clone(), Some(resume_at.into()), Query(query.clone()), ) .await .expect("subscribed to a valid channel"); let events = events .take(final_messages.len()) .collect::>() .immediately() .await; // This set of messages, in particular, _should not_ include any prior // messages from `initial_messages` or `resume_messages`. for message in final_messages { assert!(events .iter() .any(|routes::ReplayableEvent(_, event)| event.message == message)); } }; } #[tokio::test] async fn removes_expired_messages() { // Set up the environment let app = fixtures::scratch_app().await; let sender = fixtures::login::create(&app).await; let channel = fixtures::channel::create(&app).await; fixtures::message::send(&app, &sender, &channel, &fixtures::ancient()).await; let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await; // Call the endpoint let subscriber = fixtures::login::create(&app).await; let subscribed_at = fixtures::now(); let query = routes::EventsQuery { channels: [channel.id.clone()].into(), }; let routes::Events(mut events) = routes::events(State(app), subscribed_at, subscriber, None, Query(query)) .await .expect("subscribed to valid channel"); // Verify the semantics let routes::ReplayableEvent(_, event) = events .next() .immediately() .await .expect("delivered messages"); assert_eq!(channel.id, event.channel); assert_eq!(message, event.message); }