use axum::extract::State; use axum_extra::extract::Query; use futures::{ future, stream::{self, StreamExt as _}, }; use crate::{ channel::app, events::routes, repo::channel::{self}, test::fixtures::{self, future::Immediately as _}, }; #[tokio::test] async fn no_subscriptions() { // Set up the environment let app = fixtures::scratch_app().await; let subscriber = fixtures::login::create(&app).await; // Call the endpoint let subscribed_at = fixtures::now(); let query = routes::EventsQuery { channels: vec![] }; let routes::Events(mut events) = routes::events(State(app), subscribed_at, subscriber, None, Query(query)) .await .expect("empty subscription"); // Verify the structure of the response. assert!(events.next().immediately().await.is_none()); } #[tokio::test] async fn includes_historical_message() { // Set up the environment let app = fixtures::scratch_app().await; let sender = fixtures::login::create(&app).await; let channel = fixtures::channel::create(&app).await; let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await; // Call the endpoint let subscriber = fixtures::login::create(&app).await; let subscribed_at = fixtures::now(); let query = routes::EventsQuery { channels: vec![channel.id.clone()], }; let routes::Events(mut events) = routes::events(State(app), subscribed_at, subscriber, None, Query(query)) .await .expect("subscribed to valid channel"); // Verify the structure of the response. let event = events .next() .immediately() .await .expect("delivered stored message"); assert_eq!(channel.id, event.channel); assert_eq!(message, event.message); } #[tokio::test] async fn includes_live_message() { // Set up the environment let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app).await; // Call the endpoint let subscriber = fixtures::login::create(&app).await; let subscribed_at = fixtures::now(); let query = routes::EventsQuery { channels: vec![channel.id.clone()], }; let routes::Events(mut events) = routes::events( State(app.clone()), subscribed_at, subscriber, None, Query(query), ) .await .expect("subscribed to a valid channel"); // Verify the semantics let sender = fixtures::login::create(&app).await; let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await; let event = events .next() .immediately() .await .expect("delivered live message"); assert_eq!(channel.id, event.channel); assert_eq!(message, event.message); } #[tokio::test] async fn excludes_other_channels() { // Set up the environment let app = fixtures::scratch_app().await; let subscribed = fixtures::channel::create(&app).await; let unsubscribed = fixtures::channel::create(&app).await; let sender = fixtures::login::create(&app).await; let message = fixtures::message::send(&app, &sender, &subscribed, &fixtures::now()).await; fixtures::message::send(&app, &sender, &unsubscribed, &fixtures::now()).await; // Call the endpoint let subscriber = fixtures::login::create(&app).await; let subscribed_at = fixtures::now(); let query = routes::EventsQuery { channels: vec![subscribed.id.clone()], }; let routes::Events(mut events) = routes::events(State(app), subscribed_at, subscriber, None, Query(query)) .await .expect("subscribed to a valid channel"); // Verify the semantics let event = events .next() .immediately() .await .expect("delivered at least one message"); assert_eq!(subscribed.id, event.channel); assert_eq!(message, event.message); } #[tokio::test] async fn includes_multiple_channels() { // Set up the environment let app = fixtures::scratch_app().await; let sender = fixtures::login::create(&app).await; let channels = [ fixtures::channel::create(&app).await, fixtures::channel::create(&app).await, ]; let messages = stream::iter(channels) .then(|channel| async { let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await; (channel, message) }) .collect::>() .await; // Call the endpoint let subscriber = fixtures::login::create(&app).await; let subscribed_at = fixtures::now(); let query = routes::EventsQuery { channels: messages .iter() .map(|(channel, _)| &channel.id) .cloned() .collect(), }; let routes::Events(events) = routes::events(State(app), subscribed_at, subscriber, None, Query(query)) .await .expect("subscribed to valid channels"); // Verify the structure of the response. let events = events .take(messages.len()) .collect::>() .immediately() .await; for (channel, message) in messages { assert!(events .iter() .any(|event| { event.channel == channel.id && event.message == message })); } } #[tokio::test] async fn nonexitent_channel() { // Set up the environment let app = fixtures::scratch_app().await; let channel = channel::Id::generate(); // Call the endpoint let subscriber = fixtures::login::create(&app).await; let subscribed_at = fixtures::now(); let query = routes::EventsQuery { channels: vec![channel.clone()], }; let routes::ErrorResponse(error) = routes::events(State(app), subscribed_at, subscriber, None, Query(query)) .await .expect_err("subscribed to nonexistent channel"); // Verify the structure of the response. fixtures::error::expected!( error, app::EventsError::ChannelNotFound(error_channel), assert_eq!(channel, error_channel) ); } #[tokio::test] async fn sequential_messages() { // Set up the environment let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app).await; let sender = fixtures::login::create(&app).await; let messages = vec![ fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await, fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await, fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await, ]; // Call the endpoint let subscriber = fixtures::login::create(&app).await; let subscribed_at = fixtures::now(); let query = routes::EventsQuery { channels: vec![channel.id.clone()], }; let routes::Events(events) = routes::events(State(app), subscribed_at, subscriber, None, Query(query)) .await .expect("subscribed to a valid channel"); // Verify the structure of the response. let mut events = events.filter(|event| future::ready(messages.contains(&event.message))); // Verify delivery in order for message in &messages { let event = events .next() .immediately() .await .expect("undelivered messages remaining"); assert_eq!(channel.id, event.channel); assert_eq!(message, &event.message); } } #[tokio::test] async fn resumes_from() { // Set up the environment let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app).await; let sender = fixtures::login::create(&app).await; let initial_message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await; let later_messages = vec![ fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await, fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await, ]; // Call the endpoint let subscriber = fixtures::login::create(&app).await; let subscribed_at = fixtures::now(); let query = routes::EventsQuery { channels: vec![channel.id.clone()], }; let resume_at = { // First subscription let routes::Events(mut events) = routes::events( State(app.clone()), subscribed_at, subscriber.clone(), None, Query(query.clone()), ) .await .expect("subscribed to a valid channel"); let event = events.next().immediately().await.expect("delivered events"); assert_eq!(channel.id, event.channel); assert_eq!(initial_message, event.message); event.event_id() }; // Resume after disconnect let resumed_at = fixtures::now(); let routes::Events(resumed) = routes::events( State(app), resumed_at, subscriber, Some(resume_at.into()), Query(query), ) .await .expect("subscribed to a valid channel"); // Verify the structure of the response. let events = resumed .take(later_messages.len()) .collect::>() .immediately() .await; for message in later_messages { assert!(events .iter() .any(|event| event.channel == channel.id && event.message == message)); } } #[tokio::test] async fn removes_expired_messages() { // Set up the environment let app = fixtures::scratch_app().await; let sender = fixtures::login::create(&app).await; let channel = fixtures::channel::create(&app).await; fixtures::message::send(&app, &sender, &channel, &fixtures::ancient()).await; let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await; // Call the endpoint let subscriber = fixtures::login::create(&app).await; let subscribed_at = fixtures::now(); let query = routes::EventsQuery { channels: vec![channel.id.clone()], }; let routes::Events(mut events) = routes::events(State(app), subscribed_at, subscriber, None, Query(query)) .await .expect("subscribed to valid channel"); // Verify the semantics let event = events .next() .immediately() .await .expect("delivered messages"); assert_eq!(channel.id, event.channel); assert_eq!(message, event.message); }