diff options
Diffstat (limited to 'src/event/handlers/stream/test/conversation.rs')
| -rw-r--r-- | src/event/handlers/stream/test/conversation.rs | 273 |
1 files changed, 273 insertions, 0 deletions
diff --git a/src/event/handlers/stream/test/conversation.rs b/src/event/handlers/stream/test/conversation.rs new file mode 100644 index 0000000..5e08075 --- /dev/null +++ b/src/event/handlers/stream/test/conversation.rs @@ -0,0 +1,273 @@ +use axum::extract::State; +use axum_extra::extract::Query; +use futures::{future, stream::StreamExt as _}; + +use crate::test::fixtures::{self, future::Expect as _}; + +#[tokio::test] +async fn creating() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let resume_point = fixtures::boot::resume_point(&app).await; + + // Subscribe + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let super::Response(events) = super::handler( + State(app.clone()), + subscriber, + None, + Query(super::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); + + // Create a conversation + + let name = fixtures::conversation::propose(); + let conversation = app + .conversations() + .create(&name, &fixtures::now()) + .await + .expect("creating a conversation succeeds"); + + // Verify conversation created event + + events + .filter_map(fixtures::event::stream::conversation) + .filter_map(fixtures::event::stream::conversation::created) + .filter(|event| future::ready(event.conversation == conversation)) + .next() + .expect_some("conversation created event is delivered") + .await; +} + +#[tokio::test] +async fn previously_created() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let resume_point = fixtures::boot::resume_point(&app).await; + + // Create a conversation + + let name = fixtures::conversation::propose(); + let conversation = app + .conversations() + .create(&name, &fixtures::now()) + .await + .expect("creating a conversation succeeds"); + + // Subscribe + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let super::Response(events) = super::handler( + State(app.clone()), + subscriber, + None, + Query(super::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); + + // Verify conversation created event + + let _ = events + .filter_map(fixtures::event::stream::conversation) + .filter_map(fixtures::event::stream::conversation::created) + .filter(|event| future::ready(event.conversation == conversation)) + .next() + .expect_some("conversation created event is delivered") + .await; +} + +#[tokio::test] +async fn expiring() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let conversation = fixtures::conversation::create(&app, &fixtures::ancient()).await; + let resume_point = fixtures::boot::resume_point(&app).await; + + // Subscribe + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let super::Response(events) = super::handler( + State(app.clone()), + subscriber, + None, + Query(super::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); + + // Expire conversations + + app.conversations() + .expire(&fixtures::now()) + .await + .expect("expiring conversations always succeeds"); + + // Check for expiry event + let _ = events + .filter_map(fixtures::event::stream::conversation) + .filter_map(fixtures::event::stream::conversation::deleted) + .filter(|event| future::ready(event.id == conversation.id)) + .next() + .expect_some("a deleted conversation event will be delivered") + .await; +} + +#[tokio::test] +async fn previously_expired() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let conversation = fixtures::conversation::create(&app, &fixtures::ancient()).await; + let resume_point = fixtures::boot::resume_point(&app).await; + + // Expire conversations + + app.conversations() + .expire(&fixtures::now()) + .await + .expect("expiring conversation always succeeds"); + + // Subscribe + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let super::Response(events) = super::handler( + State(app.clone()), + subscriber, + None, + Query(super::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); + + // Check for expiry event + let _ = events + .filter_map(fixtures::event::stream::conversation) + .filter_map(fixtures::event::stream::conversation::deleted) + .filter(|event| future::ready(event.id == conversation.id)) + .next() + .expect_some("a deleted conversation event will be delivered") + .await; +} + +#[tokio::test] +async fn deleting() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; + + // Subscribe + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let super::Response(events) = super::handler( + State(app.clone()), + subscriber, + None, + Query(super::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); + + // Delete the conversation + + app.conversations() + .delete(&conversation.id, &fixtures::now()) + .await + .expect("deleting a valid conversation succeeds"); + + // Check for delete event + let _ = events + .filter_map(fixtures::event::stream::conversation) + .filter_map(fixtures::event::stream::conversation::deleted) + .filter(|event| future::ready(event.id == conversation.id)) + .next() + .expect_some("a deleted conversation event will be delivered") + .await; +} + +#[tokio::test] +async fn previously_deleted() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; + + // Delete the conversation + + app.conversations() + .delete(&conversation.id, &fixtures::now()) + .await + .expect("deleting a valid conversation succeeds"); + + // Subscribe + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let super::Response(events) = super::handler( + State(app.clone()), + subscriber, + None, + Query(super::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); + + // Check for expiry event + let _ = events + .filter_map(fixtures::event::stream::conversation) + .filter_map(fixtures::event::stream::conversation::deleted) + .filter(|event| future::ready(event.id == conversation.id)) + .next() + .expect_some("a deleted conversation event will be delivered") + .await; +} + +#[tokio::test] +async fn previously_purged() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let conversation = fixtures::conversation::create(&app, &fixtures::ancient()).await; + let resume_point = fixtures::boot::resume_point(&app).await; + + // Delete and purge the conversation + + app.conversations() + .delete(&conversation.id, &fixtures::ancient()) + .await + .expect("deleting a valid conversation succeeds"); + + app.conversations() + .purge(&fixtures::now()) + .await + .expect("purging conversations always succeeds"); + + // Subscribe + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let super::Response(events) = super::handler( + State(app.clone()), + subscriber, + None, + Query(super::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); + + // Check for expiry event + events + .filter_map(fixtures::event::stream::conversation) + .filter_map(fixtures::event::stream::conversation::deleted) + .filter(|event| future::ready(event.id == conversation.id)) + .next() + .expect_wait("deleted conversation events not delivered") + .await; +} |
