diff options
Diffstat (limited to 'src/events/routes/test.rs')
| -rw-r--r-- | src/events/routes/test.rs | 368 |
1 files changed, 368 insertions, 0 deletions
diff --git a/src/events/routes/test.rs b/src/events/routes/test.rs new file mode 100644 index 0000000..df2d5f6 --- /dev/null +++ b/src/events/routes/test.rs @@ -0,0 +1,368 @@ +use axum::extract::State; +use axum_extra::extract::Query; +use futures::{ + future, + stream::{self, StreamExt as _}, +}; + +use crate::{ + channel::app, + events::routes, + repo::channel::{self}, + test::fixtures::{self, future::Immediately as _}, +}; + +#[tokio::test] +async fn no_subscriptions() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let subscriber = fixtures::login::create(&app).await; + + // Call the endpoint + + let subscribed_at = fixtures::now(); + let query = routes::EventsQuery { channels: vec![] }; + let routes::Events(mut events) = + routes::events(State(app), subscribed_at, subscriber, None, Query(query)) + .await + .expect("empty subscription"); + + // Verify the structure of the response. + + assert!(events.next().immediately().await.is_none()); +} + +#[tokio::test] +async fn includes_historical_message() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let sender = fixtures::login::create(&app).await; + let channel = fixtures::channel::create(&app).await; + let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await; + + // Call the endpoint + + let subscriber = fixtures::login::create(&app).await; + let subscribed_at = fixtures::now(); + let query = routes::EventsQuery { + channels: vec![channel.id.clone()], + }; + let routes::Events(mut events) = + routes::events(State(app), subscribed_at, subscriber, None, Query(query)) + .await + .expect("subscribed to valid channel"); + + // Verify the structure of the response. + + let event = events + .next() + .immediately() + .await + .expect("delivered stored message"); + + assert_eq!(channel.id, event.channel); + assert_eq!(message, event.message); +} + +#[tokio::test] +async fn includes_live_message() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app).await; + + // Call the endpoint + + let subscriber = fixtures::login::create(&app).await; + let subscribed_at = fixtures::now(); + let query = routes::EventsQuery { + channels: vec![channel.id.clone()], + }; + let routes::Events(mut events) = routes::events( + State(app.clone()), + subscribed_at, + subscriber, + None, + Query(query), + ) + .await + .expect("subscribed to a valid channel"); + + // Verify the semantics + + let sender = fixtures::login::create(&app).await; + let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await; + + let event = events + .next() + .immediately() + .await + .expect("delivered live message"); + + assert_eq!(channel.id, event.channel); + assert_eq!(message, event.message); +} + +#[tokio::test] +async fn excludes_other_channels() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let subscribed = fixtures::channel::create(&app).await; + let unsubscribed = fixtures::channel::create(&app).await; + let sender = fixtures::login::create(&app).await; + let message = fixtures::message::send(&app, &sender, &subscribed, &fixtures::now()).await; + fixtures::message::send(&app, &sender, &unsubscribed, &fixtures::now()).await; + + // Call the endpoint + + let subscriber = fixtures::login::create(&app).await; + let subscribed_at = fixtures::now(); + let query = routes::EventsQuery { + channels: vec![subscribed.id.clone()], + }; + let routes::Events(mut events) = + routes::events(State(app), subscribed_at, subscriber, None, Query(query)) + .await + .expect("subscribed to a valid channel"); + + // Verify the semantics + + let event = events + .next() + .immediately() + .await + .expect("delivered at least one message"); + + assert_eq!(subscribed.id, event.channel); + assert_eq!(message, event.message); +} + +#[tokio::test] +async fn includes_multiple_channels() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let sender = fixtures::login::create(&app).await; + + let channels = [ + fixtures::channel::create(&app).await, + fixtures::channel::create(&app).await, + ]; + + let messages = stream::iter(channels) + .then(|channel| async { + let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await; + + (channel, message) + }) + .collect::<Vec<_>>() + .await; + + // Call the endpoint + + let subscriber = fixtures::login::create(&app).await; + let subscribed_at = fixtures::now(); + let query = routes::EventsQuery { + channels: messages + .iter() + .map(|(channel, _)| &channel.id) + .cloned() + .collect(), + }; + let routes::Events(events) = + routes::events(State(app), subscribed_at, subscriber, None, Query(query)) + .await + .expect("subscribed to valid channels"); + + // Verify the structure of the response. + + let events = events + .take(messages.len()) + .collect::<Vec<_>>() + .immediately() + .await; + + for (channel, message) in messages { + assert!(events + .iter() + .any(|event| { event.channel == channel.id && event.message == message })); + } +} + +#[tokio::test] +async fn nonexitent_channel() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = channel::Id::generate(); + + // Call the endpoint + + let subscriber = fixtures::login::create(&app).await; + let subscribed_at = fixtures::now(); + let query = routes::EventsQuery { + channels: vec![channel.clone()], + }; + let routes::ErrorResponse(error) = + routes::events(State(app), subscribed_at, subscriber, None, Query(query)) + .await + .expect_err("subscribed to nonexistent channel"); + + // Verify the structure of the response. + + fixtures::error::expected!( + error, + app::EventsError::ChannelNotFound(error_channel), + assert_eq!(channel, error_channel) + ); +} + +#[tokio::test] +async fn sequential_messages() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app).await; + let sender = fixtures::login::create(&app).await; + + let messages = vec![ + fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await, + fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await, + fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await, + ]; + + // Call the endpoint + + let subscriber = fixtures::login::create(&app).await; + let subscribed_at = fixtures::now(); + let query = routes::EventsQuery { + channels: vec![channel.id.clone()], + }; + let routes::Events(events) = + routes::events(State(app), subscribed_at, subscriber, None, Query(query)) + .await + .expect("subscribed to a valid channel"); + + // Verify the structure of the response. + + let mut events = events.filter(|event| future::ready(messages.contains(&event.message))); + + // Verify delivery in order + for message in &messages { + let event = events + .next() + .immediately() + .await + .expect("undelivered messages remaining"); + + assert_eq!(channel.id, event.channel); + assert_eq!(message, &event.message); + } +} + +#[tokio::test] +async fn resumes_from() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app).await; + let sender = fixtures::login::create(&app).await; + + let initial_message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await; + + let later_messages = vec![ + fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await, + fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await, + ]; + + // Call the endpoint + + let subscriber = fixtures::login::create(&app).await; + let subscribed_at = fixtures::now(); + let query = routes::EventsQuery { + channels: vec![channel.id.clone()], + }; + + let resume_at = { + // First subscription + let routes::Events(mut events) = routes::events( + State(app.clone()), + subscribed_at, + subscriber.clone(), + None, + Query(query.clone()), + ) + .await + .expect("subscribed to a valid channel"); + + let event = events.next().immediately().await.expect("delivered events"); + + assert_eq!(channel.id, event.channel); + assert_eq!(initial_message, event.message); + + event.event_id() + }; + + // Resume after disconnect + let resumed_at = fixtures::now(); + let routes::Events(resumed) = routes::events( + State(app), + resumed_at, + subscriber, + Some(resume_at.into()), + Query(query), + ) + .await + .expect("subscribed to a valid channel"); + + // Verify the structure of the response. + + let events = resumed + .take(later_messages.len()) + .collect::<Vec<_>>() + .immediately() + .await; + + for message in later_messages { + assert!(events + .iter() + .any(|event| event.channel == channel.id && event.message == message)); + } +} + +#[tokio::test] +async fn removes_expired_messages() { + // Set up the environment + let app = fixtures::scratch_app().await; + let sender = fixtures::login::create(&app).await; + let channel = fixtures::channel::create(&app).await; + + fixtures::message::send(&app, &sender, &channel, &fixtures::ancient()).await; + let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await; + + // Call the endpoint + + let subscriber = fixtures::login::create(&app).await; + let subscribed_at = fixtures::now(); + let query = routes::EventsQuery { + channels: vec![channel.id.clone()], + }; + let routes::Events(mut events) = + routes::events(State(app), subscribed_at, subscriber, None, Query(query)) + .await + .expect("subscribed to valid channel"); + + // Verify the semantics + + let event = events + .next() + .immediately() + .await + .expect("delivered messages"); + + assert_eq!(channel.id, event.channel); + assert_eq!(message, event.message); +} |
