use axum::extract::State; use axum_extra::extract::Query; use futures::{future, stream::StreamExt as _}; use crate::test::fixtures::{self, future::Expect as _}; #[tokio::test] async fn creating() { // Set up the environment let app = fixtures::scratch_app().await; let resume_point = fixtures::boot::resume_point(&app).await; // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; let super::Response(events) = super::handler( State(app.clone()), subscriber, None, Query(super::QueryParams { resume_point }), ) .await .expect("subscribe never fails"); // Create a conversation let name = fixtures::conversation::propose(); let conversation = app .conversations() .create(&name, &fixtures::now()) .await .expect("creating a conversation succeeds"); // Verify conversation created event events .filter_map(fixtures::event::stream::conversation) .filter_map(fixtures::event::stream::conversation::created) .filter(|event| future::ready(event.conversation == conversation)) .next() .expect_some("conversation created event is delivered") .await; } #[tokio::test] async fn previously_created() { // Set up the environment let app = fixtures::scratch_app().await; let resume_point = fixtures::boot::resume_point(&app).await; // Create a conversation let name = fixtures::conversation::propose(); let conversation = app .conversations() .create(&name, &fixtures::now()) .await .expect("creating a conversation succeeds"); // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; let super::Response(events) = super::handler( State(app.clone()), subscriber, None, Query(super::QueryParams { resume_point }), ) .await .expect("subscribe never fails"); // Verify conversation created event let _ = events .filter_map(fixtures::event::stream::conversation) .filter_map(fixtures::event::stream::conversation::created) .filter(|event| future::ready(event.conversation == conversation)) .next() .expect_some("conversation created event is delivered") .await; } #[tokio::test] async fn expiring() { // Set up the environment let app = fixtures::scratch_app().await; let conversation = fixtures::conversation::create(&app, &fixtures::ancient()).await; let resume_point = fixtures::boot::resume_point(&app).await; // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; let super::Response(events) = super::handler( State(app.clone()), subscriber, None, Query(super::QueryParams { resume_point }), ) .await .expect("subscribe never fails"); // Expire conversations app.conversations() .expire(&fixtures::now()) .await .expect("expiring conversations always succeeds"); // Check for expiry event let _ = events .filter_map(fixtures::event::stream::conversation) .filter_map(fixtures::event::stream::conversation::deleted) .filter(|event| future::ready(event.id == conversation.id)) .next() .expect_some("a deleted conversation event will be delivered") .await; } #[tokio::test] async fn previously_expired() { // Set up the environment let app = fixtures::scratch_app().await; let conversation = fixtures::conversation::create(&app, &fixtures::ancient()).await; let resume_point = fixtures::boot::resume_point(&app).await; // Expire conversations app.conversations() .expire(&fixtures::now()) .await .expect("expiring conversation always succeeds"); // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; let super::Response(events) = super::handler( State(app.clone()), subscriber, None, Query(super::QueryParams { resume_point }), ) .await .expect("subscribe never fails"); // Check for expiry event let _ = events .filter_map(fixtures::event::stream::conversation) .filter_map(fixtures::event::stream::conversation::deleted) .filter(|event| future::ready(event.id == conversation.id)) .next() .expect_some("a deleted conversation event will be delivered") .await; } #[tokio::test] async fn deleting() { // Set up the environment let app = fixtures::scratch_app().await; let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; let resume_point = fixtures::boot::resume_point(&app).await; // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; let super::Response(events) = super::handler( State(app.clone()), subscriber, None, Query(super::QueryParams { resume_point }), ) .await .expect("subscribe never fails"); // Delete the conversation app.conversations() .delete(&conversation.id, &fixtures::now()) .await .expect("deleting a valid conversation succeeds"); // Check for delete event let _ = events .filter_map(fixtures::event::stream::conversation) .filter_map(fixtures::event::stream::conversation::deleted) .filter(|event| future::ready(event.id == conversation.id)) .next() .expect_some("a deleted conversation event will be delivered") .await; } #[tokio::test] async fn previously_deleted() { // Set up the environment let app = fixtures::scratch_app().await; let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; let resume_point = fixtures::boot::resume_point(&app).await; // Delete the conversation app.conversations() .delete(&conversation.id, &fixtures::now()) .await .expect("deleting a valid conversation succeeds"); // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; let super::Response(events) = super::handler( State(app.clone()), subscriber, None, Query(super::QueryParams { resume_point }), ) .await .expect("subscribe never fails"); // Check for expiry event let _ = events .filter_map(fixtures::event::stream::conversation) .filter_map(fixtures::event::stream::conversation::deleted) .filter(|event| future::ready(event.id == conversation.id)) .next() .expect_some("a deleted conversation event will be delivered") .await; } #[tokio::test] async fn previously_purged() { // Set up the environment let app = fixtures::scratch_app().await; let conversation = fixtures::conversation::create(&app, &fixtures::ancient()).await; let resume_point = fixtures::boot::resume_point(&app).await; // Delete and purge the conversation app.conversations() .delete(&conversation.id, &fixtures::ancient()) .await .expect("deleting a valid conversation succeeds"); app.conversations() .purge(&fixtures::now()) .await .expect("purging conversations always succeeds"); // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; let super::Response(events) = super::handler( State(app.clone()), subscriber, None, Query(super::QueryParams { resume_point }), ) .await .expect("subscribe never fails"); // Check for expiry event events .filter_map(fixtures::event::stream::conversation) .filter_map(fixtures::event::stream::conversation::deleted) .filter(|event| future::ready(event.id == conversation.id)) .next() .expect_wait("deleted conversation events not delivered") .await; }