use axum::extract::State; use axum_extra::extract::Query; use futures::{ future, stream::{self, StreamExt as _}, }; use crate::test::fixtures::{self, future::Expect as _}; #[tokio::test] async fn sending() { // Set up the environment let app = fixtures::scratch_app().await; let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; let resume_point = fixtures::boot::resume_point(&app).await; // Call the endpoint let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; let super::Response(events) = super::handler( State(app.clone()), subscriber, None, Query(super::QueryParams { resume_point }), ) .await .expect("subscribe never fails"); // Send a message let sender = fixtures::user::create(&app, &fixtures::now()).await; let message = app .messages() .send( &conversation.id, &sender, &fixtures::now(), &fixtures::message::propose(), ) .await .expect("sending a message succeeds"); // Verify that an event is delivered let _ = events .filter_map(fixtures::event::stream::message) .filter_map(fixtures::event::stream::message::sent) .filter(|event| future::ready(event.message == message)) .next() .expect_some("delivered message sent event") .await; } #[tokio::test] async fn previously_sent() { // Set up the environment let app = fixtures::scratch_app().await; let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; let resume_point = fixtures::boot::resume_point(&app).await; // Send a message let sender = fixtures::user::create(&app, &fixtures::now()).await; let message = app .messages() .send( &conversation.id, &sender, &fixtures::now(), &fixtures::message::propose(), ) .await .expect("sending a message succeeds"); // Call the endpoint let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; let super::Response(events) = super::handler( State(app.clone()), subscriber, None, Query(super::QueryParams { resume_point }), ) .await .expect("subscribe never fails"); // Verify that an event is delivered let _ = events .filter_map(fixtures::event::stream::message) .filter_map(fixtures::event::stream::message::sent) .filter(|event| future::ready(event.message == message)) .next() .expect_some("delivered message sent event") .await; } #[tokio::test] async fn sent_in_multiple_conversations() { // Set up the environment let app = fixtures::scratch_app().await; let sender = fixtures::user::create(&app, &fixtures::now()).await; let resume_point = fixtures::boot::resume_point(&app).await; let conversations = [ fixtures::conversation::create(&app, &fixtures::now()).await, fixtures::conversation::create(&app, &fixtures::now()).await, ]; let messages = stream::iter(conversations) .then(|conversation| { let app = app.clone(); let sender = sender.clone(); let conversation = conversation.clone(); async move { fixtures::message::send(&app, &conversation, &sender, &fixtures::now()).await } }) .collect::>() .await; // Call the endpoint let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; let super::Response(events) = super::handler( State(app), subscriber, None, Query(super::QueryParams { resume_point }), ) .await .expect("subscribe never fails"); // Verify the structure of the response. let events = events .filter_map(fixtures::event::stream::message) .filter_map(fixtures::event::stream::message::sent) .take(messages.len()) .collect::>() .expect_ready("events ready") .await; for message in &messages { assert!(events.iter().any(|event| &event.message == message)); } } #[tokio::test] async fn sent_sequentially() { // Set up the environment let app = fixtures::scratch_app().await; let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; let sender = fixtures::user::create(&app, &fixtures::now()).await; let resume_point = fixtures::boot::resume_point(&app).await; let messages = vec![ fixtures::message::send(&app, &conversation, &sender, &fixtures::now()).await, fixtures::message::send(&app, &conversation, &sender, &fixtures::now()).await, fixtures::message::send(&app, &conversation, &sender, &fixtures::now()).await, ]; // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; let super::Response(events) = super::handler( State(app), subscriber, None, Query(super::QueryParams { resume_point }), ) .await .expect("subscribe never fails"); // Verify the expected events in the expected order let mut events = events .filter_map(fixtures::event::stream::message) .filter_map(fixtures::event::stream::message::sent) .filter(|event| future::ready(messages.iter().any(|message| &event.message == message))); for message in &messages { let event = events .next() .expect_some("undelivered messages remaining") .await; assert_eq!(message, &event.message); } } #[tokio::test] async fn expiring() { // Set up the environment let app = fixtures::scratch_app().await; let conversation = fixtures::conversation::create(&app, &fixtures::ancient()).await; let sender = fixtures::user::create(&app, &fixtures::ancient()).await; let message = fixtures::message::send(&app, &conversation, &sender, &fixtures::ancient()).await; let resume_point = fixtures::boot::resume_point(&app).await; // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; let super::Response(events) = super::handler( State(app.clone()), subscriber, None, Query(super::QueryParams { resume_point }), ) .await .expect("subscribe never fails"); // Expire messages app.messages() .expire(&fixtures::now()) .await .expect("expiring messages always succeeds"); // Check for expiry event let _ = events .filter_map(fixtures::event::stream::message) .filter_map(fixtures::event::stream::message::deleted) .filter(|event| future::ready(event.id == message.id)) .next() .expect_some("a deleted message event will be delivered") .await; } #[tokio::test] async fn previously_expired() { // Set up the environment let app = fixtures::scratch_app().await; let conversation = fixtures::conversation::create(&app, &fixtures::ancient()).await; let sender = fixtures::user::create(&app, &fixtures::ancient()).await; let message = fixtures::message::send(&app, &conversation, &sender, &fixtures::ancient()).await; let resume_point = fixtures::boot::resume_point(&app).await; // Expire messages app.messages() .expire(&fixtures::now()) .await .expect("expiring messages always succeeds"); // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; let super::Response(events) = super::handler( State(app.clone()), subscriber, None, Query(super::QueryParams { resume_point }), ) .await .expect("subscribe never fails"); // Check for expiry event let _ = events .filter_map(fixtures::event::stream::message) .filter_map(fixtures::event::stream::message::deleted) .filter(|event| future::ready(event.id == message.id)) .next() .expect_some("a deleted message event will be delivered") .await; } #[tokio::test] async fn deleting() { // Set up the environment let app = fixtures::scratch_app().await; let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; let sender = fixtures::user::create(&app, &fixtures::now()).await; let message = fixtures::message::send(&app, &conversation, &sender, &fixtures::now()).await; let resume_point = fixtures::boot::resume_point(&app).await; // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; let super::Response(events) = super::handler( State(app.clone()), subscriber, None, Query(super::QueryParams { resume_point }), ) .await .expect("subscribe never fails"); // Delete the message app.messages() .delete(&sender, &message.id, &fixtures::now()) .await .expect("deleting a valid message succeeds"); // Check for delete event let _ = events .filter_map(fixtures::event::stream::message) .filter_map(fixtures::event::stream::message::deleted) .filter(|event| future::ready(event.id == message.id)) .next() .expect_some("a deleted message event will be delivered") .await; } #[tokio::test] async fn previously_deleted() { // Set up the environment let app = fixtures::scratch_app().await; let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; let sender = fixtures::user::create(&app, &fixtures::now()).await; let message = fixtures::message::send(&app, &conversation, &sender, &fixtures::now()).await; let resume_point = fixtures::boot::resume_point(&app).await; // Delete the message app.messages() .delete(&sender, &message.id, &fixtures::now()) .await .expect("deleting a valid message succeeds"); // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; let super::Response(events) = super::handler( State(app.clone()), subscriber, None, Query(super::QueryParams { resume_point }), ) .await .expect("subscribe never fails"); // Check for delete event let _ = events .filter_map(fixtures::event::stream::message) .filter_map(fixtures::event::stream::message::deleted) .filter(|event| future::ready(event.id == message.id)) .next() .expect_some("a deleted message event will be delivered") .await; } #[tokio::test] async fn previously_purged() { // Set up the environment let app = fixtures::scratch_app().await; let conversation = fixtures::conversation::create(&app, &fixtures::ancient()).await; let sender = fixtures::user::create(&app, &fixtures::ancient()).await; let message = fixtures::message::send(&app, &conversation, &sender, &fixtures::ancient()).await; let resume_point = fixtures::boot::resume_point(&app).await; // Purge the message app.messages() .delete(&sender, &message.id, &fixtures::ancient()) .await .expect("deleting a valid message succeeds"); app.messages() .purge(&fixtures::now()) .await .expect("purge always succeeds"); // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; let super::Response(events) = super::handler( State(app.clone()), subscriber, None, Query(super::QueryParams { resume_point }), ) .await .expect("subscribe never fails"); // Check for delete event events .filter_map(fixtures::event::stream::message) .filter_map(fixtures::event::stream::message::deleted) .filter(|event| future::ready(event.id == message.id)) .next() .expect_wait("no deleted message will be delivered") .await; }