diff options
| author | Owen Jacobson <owen@grimoire.ca> | 2025-06-17 02:11:45 -0400 |
|---|---|---|
| committer | Owen Jacobson <owen@grimoire.ca> | 2025-06-18 18:31:40 -0400 |
| commit | 4e3d5ccac99b24934c972e088cd7eb02bb95df06 (patch) | |
| tree | c94f5a42f7e734b81892c1289a1d2b566706ba7c /src/event/handlers/stream/test/message.rs | |
| parent | 5ed96f8e8b9d9f19ee249f5c73a5a21ef6bca09f (diff) | |
Handlers are _named operations_, which can be exposed via routes.
Each domain module that exposes handlers does so through a `handlers` child module, ideally as a top-level symbol that can be plugged directly into Axum's `MethodRouter`. Modules could make exceptions to this - kill the doctrinaire inside yourself, after all - but none of the API modules that actually exist need such exceptions, and consistency is useful.
The related details of request types, URL types, response types, errors, &c &c are then organized into modules under `handlers`, along with their respective tests.
Diffstat (limited to 'src/event/handlers/stream/test/message.rs')
| -rw-r--r-- | src/event/handlers/stream/test/message.rs | 393 |
1 files changed, 393 insertions, 0 deletions
diff --git a/src/event/handlers/stream/test/message.rs b/src/event/handlers/stream/test/message.rs new file mode 100644 index 0000000..a80c896 --- /dev/null +++ b/src/event/handlers/stream/test/message.rs @@ -0,0 +1,393 @@ +use axum::extract::State; +use axum_extra::extract::Query; +use futures::{ + future, + stream::{self, StreamExt as _}, +}; + +use crate::test::fixtures::{self, future::Expect as _}; + +#[tokio::test] +async fn sending() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; + + // Call the endpoint + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let super::Response(events) = super::handler( + State(app.clone()), + subscriber, + None, + Query(super::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); + + // Send a message + + let sender = fixtures::user::create(&app, &fixtures::now()).await; + let message = app + .messages() + .send( + &channel.id, + &sender, + &fixtures::now(), + &fixtures::message::propose(), + ) + .await + .expect("sending a message succeeds"); + + // Verify that an event is delivered + + let _ = events + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::sent) + .filter(|event| future::ready(event.message == message)) + .next() + .expect_some("delivered message sent event") + .await; +} + +#[tokio::test] +async fn previously_sent() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; + + // Send a message + + let sender = fixtures::user::create(&app, &fixtures::now()).await; + let message = app + .messages() + .send( + &channel.id, + &sender, + &fixtures::now(), + &fixtures::message::propose(), + ) + .await + .expect("sending a message succeeds"); + + // Call the endpoint + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let super::Response(events) = super::handler( + State(app.clone()), + subscriber, + None, + Query(super::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); + + // Verify that an event is delivered + + let _ = events + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::sent) + .filter(|event| future::ready(event.message == message)) + .next() + .expect_some("delivered message sent event") + .await; +} + +#[tokio::test] +async fn sent_in_multiple_channels() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let sender = fixtures::user::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; + + let channels = [ + fixtures::channel::create(&app, &fixtures::now()).await, + fixtures::channel::create(&app, &fixtures::now()).await, + ]; + + let messages = stream::iter(channels) + .then(|channel| { + let app = app.clone(); + let sender = sender.clone(); + let channel = channel.clone(); + async move { fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await } + }) + .collect::<Vec<_>>() + .await; + + // Call the endpoint + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let super::Response(events) = super::handler( + State(app), + subscriber, + None, + Query(super::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); + + // Verify the structure of the response. + + let events = events + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::sent) + .take(messages.len()) + .collect::<Vec<_>>() + .expect_ready("events ready") + .await; + + for message in &messages { + assert!(events.iter().any(|event| &event.message == message)); + } +} + +#[tokio::test] +async fn sent_sequentially() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let sender = fixtures::user::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; + + let messages = vec![ + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + ]; + + // Subscribe + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let super::Response(events) = super::handler( + State(app), + subscriber, + None, + Query(super::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); + + // Verify the expected events in the expected order + + let mut events = events + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::sent) + .filter(|event| future::ready(messages.iter().any(|message| &event.message == message))); + + for message in &messages { + let event = events + .next() + .expect_some("undelivered messages remaining") + .await; + + assert_eq!(message, &event.message); + } +} + +#[tokio::test] +async fn expiring() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; + let sender = fixtures::user::create(&app, &fixtures::ancient()).await; + let message = fixtures::message::send(&app, &channel, &sender, &fixtures::ancient()).await; + let resume_point = fixtures::boot::resume_point(&app).await; + + // Subscribe + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let super::Response(events) = super::handler( + State(app.clone()), + subscriber, + None, + Query(super::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); + + // Expire messages + + app.messages() + .expire(&fixtures::now()) + .await + .expect("expiring messages always succeeds"); + + // Check for expiry event + let _ = events + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::deleted) + .filter(|event| future::ready(event.id == message.id)) + .next() + .expect_some("a deleted message event will be delivered") + .await; +} + +#[tokio::test] +async fn previously_expired() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; + let sender = fixtures::user::create(&app, &fixtures::ancient()).await; + let message = fixtures::message::send(&app, &channel, &sender, &fixtures::ancient()).await; + let resume_point = fixtures::boot::resume_point(&app).await; + + // Expire messages + + app.messages() + .expire(&fixtures::now()) + .await + .expect("expiring messages always succeeds"); + + // Subscribe + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let super::Response(events) = super::handler( + State(app.clone()), + subscriber, + None, + Query(super::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); + + // Check for expiry event + let _ = events + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::deleted) + .filter(|event| future::ready(event.id == message.id)) + .next() + .expect_some("a deleted message event will be delivered") + .await; +} + +#[tokio::test] +async fn deleting() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let sender = fixtures::user::create(&app, &fixtures::now()).await; + let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; + + // Subscribe + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let super::Response(events) = super::handler( + State(app.clone()), + subscriber, + None, + Query(super::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); + + // Delete the message + + app.messages() + .delete(&sender, &message.id, &fixtures::now()) + .await + .expect("deleting a valid message succeeds"); + + // Check for delete event + let _ = events + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::deleted) + .filter(|event| future::ready(event.id == message.id)) + .next() + .expect_some("a deleted message event will be delivered") + .await; +} + +#[tokio::test] +async fn previously_deleted() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let sender = fixtures::user::create(&app, &fixtures::now()).await; + let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; + + // Delete the message + + app.messages() + .delete(&sender, &message.id, &fixtures::now()) + .await + .expect("deleting a valid message succeeds"); + + // Subscribe + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let super::Response(events) = super::handler( + State(app.clone()), + subscriber, + None, + Query(super::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); + + // Check for delete event + let _ = events + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::deleted) + .filter(|event| future::ready(event.id == message.id)) + .next() + .expect_some("a deleted message event will be delivered") + .await; +} + +#[tokio::test] +async fn previously_purged() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; + let sender = fixtures::user::create(&app, &fixtures::ancient()).await; + let message = fixtures::message::send(&app, &channel, &sender, &fixtures::ancient()).await; + let resume_point = fixtures::boot::resume_point(&app).await; + + // Purge the message + + app.messages() + .delete(&sender, &message.id, &fixtures::ancient()) + .await + .expect("deleting a valid message succeeds"); + + app.messages() + .purge(&fixtures::now()) + .await + .expect("purge always succeeds"); + + // Subscribe + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let super::Response(events) = super::handler( + State(app.clone()), + subscriber, + None, + Query(super::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); + + // Check for delete event + + events + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::deleted) + .filter(|event| future::ready(event.id == message.id)) + .next() + .expect_wait("no deleted message will be delivered") + .await; +} |
