use axum::extract::State; use axum_extra::extract::Query; use futures::{ future, stream::{self, StreamExt as _}, }; use crate::{ event::routes::get, test::fixtures::{self, future::Immediately as _}, }; #[tokio::test] async fn sending() { // Set up the environment let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; // Call the endpoint let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; let get::Response(events) = get::handler(State(app.clone()), subscriber, None, Query::default()) .await .expect("subscribe never fails"); // Send a message let sender = fixtures::login::create(&app, &fixtures::now()).await; let message = app .messages() .send( &channel.id, &sender, &fixtures::now(), &fixtures::message::propose(), ) .await .expect("sending a message succeeds"); // Verify that an event is delivered let _ = events .filter_map(fixtures::event::message) .filter_map(fixtures::event::message::sent) .filter(|event| future::ready(event.message == message)) .next() .immediately() .await .expect("delivered message sent event"); } #[tokio::test] async fn previously_sent() { // Set up the environment let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; // Send a message let sender = fixtures::login::create(&app, &fixtures::now()).await; let message = app .messages() .send( &channel.id, &sender, &fixtures::now(), &fixtures::message::propose(), ) .await .expect("sending a message succeeds"); // Call the endpoint let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; let get::Response(events) = get::handler(State(app.clone()), subscriber, None, Query::default()) .await .expect("subscribe never fails"); // Verify that an event is delivered let _ = events .filter_map(fixtures::event::message) .filter_map(fixtures::event::message::sent) .filter(|event| future::ready(event.message == message)) .next() .immediately() .await .expect("delivered message sent event"); } #[tokio::test] async fn sent_in_multiple_channels() { // Set up the environment let app = fixtures::scratch_app().await; let sender = fixtures::login::create(&app, &fixtures::now()).await; let channels = [ fixtures::channel::create(&app, &fixtures::now()).await, fixtures::channel::create(&app, &fixtures::now()).await, ]; let messages = stream::iter(channels) .then(|channel| { let app = app.clone(); let sender = sender.clone(); let channel = channel.clone(); async move { fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await } }) .collect::>() .await; // Call the endpoint let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; let get::Response(events) = get::handler(State(app), subscriber, None, Query::default()) .await .expect("subscribe never fails"); // Verify the structure of the response. let events = events .filter_map(fixtures::event::message) .filter_map(fixtures::event::message::sent) .take(messages.len()) .collect::>() .immediately() .await; for message in &messages { assert!(events.iter().any(|event| &event.message == message)); } } #[tokio::test] async fn sent_sequentially() { // Set up the environment let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; let sender = fixtures::login::create(&app, &fixtures::now()).await; let messages = vec![ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, ]; // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; let get::Response(events) = get::handler(State(app), subscriber, None, Query::default()) .await .expect("subscribe never fails"); // Verify the expected events in the expected order let mut events = events .filter_map(fixtures::event::message) .filter_map(fixtures::event::message::sent) .filter(|event| future::ready(messages.iter().any(|message| &event.message == message))); for message in &messages { let event = events .next() .immediately() .await .expect("undelivered messages remaining"); assert_eq!(message, &event.message); } } #[tokio::test] async fn expiring() { // Set up the environment let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; let sender = fixtures::login::create(&app, &fixtures::ancient()).await; let message = fixtures::message::send(&app, &channel, &sender, &fixtures::ancient()).await; // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; let get::Response(events) = get::handler(State(app.clone()), subscriber, None, Query::default()) .await .expect("subscribe never fails"); // Expire messages app.messages() .expire(&fixtures::now()) .await .expect("expiring messages always succeeds"); // Check for expiry event let _ = events .filter_map(fixtures::event::message) .filter_map(fixtures::event::message::deleted) .filter(|event| future::ready(event.id == message.id)) .next() .immediately() .await .expect("a deleted message event will be delivered"); } #[tokio::test] async fn previously_expired() { // Set up the environment let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; let sender = fixtures::login::create(&app, &fixtures::ancient()).await; let message = fixtures::message::send(&app, &channel, &sender, &fixtures::ancient()).await; // Expire messages app.messages() .expire(&fixtures::now()) .await .expect("expiring messages always succeeds"); // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; let get::Response(events) = get::handler(State(app.clone()), subscriber, None, Query::default()) .await .expect("subscribe never fails"); // Check for expiry event let _ = events .filter_map(fixtures::event::message) .filter_map(fixtures::event::message::deleted) .filter(|event| future::ready(event.id == message.id)) .next() .immediately() .await .expect("a deleted message event will be delivered"); } #[tokio::test] async fn deleting() { // Set up the environment let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; let sender = fixtures::login::create(&app, &fixtures::now()).await; let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; let get::Response(events) = get::handler(State(app.clone()), subscriber, None, Query::default()) .await .expect("subscribe never fails"); // Delete the message app.messages() .delete(&message.id, &fixtures::now()) .await .expect("deleting a valid message succeeds"); // Check for delete event let _ = events .filter_map(fixtures::event::message) .filter_map(fixtures::event::message::deleted) .filter(|event| future::ready(event.id == message.id)) .next() .immediately() .await .expect("a deleted message event will be delivered"); } #[tokio::test] async fn previously_deleted() { // Set up the environment let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; let sender = fixtures::login::create(&app, &fixtures::now()).await; let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; // Delete the message app.messages() .delete(&message.id, &fixtures::now()) .await .expect("deleting a valid message succeeds"); // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; let get::Response(events) = get::handler(State(app.clone()), subscriber, None, Query::default()) .await .expect("subscribe never fails"); // Check for delete event let _ = events .filter_map(fixtures::event::message) .filter_map(fixtures::event::message::deleted) .filter(|event| future::ready(event.id == message.id)) .next() .immediately() .await .expect("a deleted message event will be delivered"); }