summaryrefslogtreecommitdiff
path: root/src/event/handlers/stream/test/channel.rs
diff options
context:
space:
mode:
authorOwen Jacobson <owen@grimoire.ca>2025-06-17 02:11:45 -0400
committerOwen Jacobson <owen@grimoire.ca>2025-06-18 18:31:40 -0400
commit4e3d5ccac99b24934c972e088cd7eb02bb95df06 (patch)
treec94f5a42f7e734b81892c1289a1d2b566706ba7c /src/event/handlers/stream/test/channel.rs
parent5ed96f8e8b9d9f19ee249f5c73a5a21ef6bca09f (diff)
Handlers are _named operations_, which can be exposed via routes.
Each domain module that exposes handlers does so through a `handlers` child module, ideally as a top-level symbol that can be plugged directly into Axum's `MethodRouter`. Modules could make exceptions to this - kill the doctrinaire inside yourself, after all - but none of the API modules that actually exist need such exceptions, and consistency is useful. The related details of request types, URL types, response types, errors, &c &c are then organized into modules under `handlers`, along with their respective tests.
Diffstat (limited to 'src/event/handlers/stream/test/channel.rs')
-rw-r--r--src/event/handlers/stream/test/channel.rs273
1 files changed, 273 insertions, 0 deletions
diff --git a/src/event/handlers/stream/test/channel.rs b/src/event/handlers/stream/test/channel.rs
new file mode 100644
index 0000000..187c3c3
--- /dev/null
+++ b/src/event/handlers/stream/test/channel.rs
@@ -0,0 +1,273 @@
+use axum::extract::State;
+use axum_extra::extract::Query;
+use futures::{future, stream::StreamExt as _};
+
+use crate::test::fixtures::{self, future::Expect as _};
+
+#[tokio::test]
+async fn creating() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
+
+ // Subscribe
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let super::Response(events) = super::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(super::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ // Create a channel
+
+ let name = fixtures::channel::propose();
+ let channel = app
+ .channels()
+ .create(&name, &fixtures::now())
+ .await
+ .expect("creating a channel succeeds");
+
+ // Verify channel created event
+
+ events
+ .filter_map(fixtures::event::channel)
+ .filter_map(fixtures::event::channel::created)
+ .filter(|event| future::ready(event.channel == channel))
+ .next()
+ .expect_some("channel created event is delivered")
+ .await;
+}
+
+#[tokio::test]
+async fn previously_created() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
+
+ // Create a channel
+
+ let name = fixtures::channel::propose();
+ let channel = app
+ .channels()
+ .create(&name, &fixtures::now())
+ .await
+ .expect("creating a channel succeeds");
+
+ // Subscribe
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let super::Response(events) = super::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(super::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ // Verify channel created event
+
+ let _ = events
+ .filter_map(fixtures::event::channel)
+ .filter_map(fixtures::event::channel::created)
+ .filter(|event| future::ready(event.channel == channel))
+ .next()
+ .expect_some("channel created event is delivered")
+ .await;
+}
+
+#[tokio::test]
+async fn expiring() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::ancient()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
+
+ // Subscribe
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let super::Response(events) = super::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(super::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ // Expire channels
+
+ app.channels()
+ .expire(&fixtures::now())
+ .await
+ .expect("expiring channels always succeeds");
+
+ // Check for expiry event
+ let _ = events
+ .filter_map(fixtures::event::channel)
+ .filter_map(fixtures::event::channel::deleted)
+ .filter(|event| future::ready(event.id == channel.id))
+ .next()
+ .expect_some("a deleted channel event will be delivered")
+ .await;
+}
+
+#[tokio::test]
+async fn previously_expired() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::ancient()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
+
+ // Expire channels
+
+ app.channels()
+ .expire(&fixtures::now())
+ .await
+ .expect("expiring channels always succeeds");
+
+ // Subscribe
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let super::Response(events) = super::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(super::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ // Check for expiry event
+ let _ = events
+ .filter_map(fixtures::event::channel)
+ .filter_map(fixtures::event::channel::deleted)
+ .filter(|event| future::ready(event.id == channel.id))
+ .next()
+ .expect_some("a deleted channel event will be delivered")
+ .await;
+}
+
+#[tokio::test]
+async fn deleting() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
+
+ // Subscribe
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let super::Response(events) = super::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(super::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ // Delete the channel
+
+ app.channels()
+ .delete(&channel.id, &fixtures::now())
+ .await
+ .expect("deleting a valid channel succeeds");
+
+ // Check for delete event
+ let _ = events
+ .filter_map(fixtures::event::channel)
+ .filter_map(fixtures::event::channel::deleted)
+ .filter(|event| future::ready(event.id == channel.id))
+ .next()
+ .expect_some("a deleted channel event will be delivered")
+ .await;
+}
+
+#[tokio::test]
+async fn previously_deleted() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
+
+ // Delete the channel
+
+ app.channels()
+ .delete(&channel.id, &fixtures::now())
+ .await
+ .expect("deleting a valid channel succeeds");
+
+ // Subscribe
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let super::Response(events) = super::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(super::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ // Check for expiry event
+ let _ = events
+ .filter_map(fixtures::event::channel)
+ .filter_map(fixtures::event::channel::deleted)
+ .filter(|event| future::ready(event.id == channel.id))
+ .next()
+ .expect_some("a deleted channel event will be delivered")
+ .await;
+}
+
+#[tokio::test]
+async fn previously_purged() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::ancient()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
+
+ // Delete and purge the channel
+
+ app.channels()
+ .delete(&channel.id, &fixtures::ancient())
+ .await
+ .expect("deleting a valid channel succeeds");
+
+ app.channels()
+ .purge(&fixtures::now())
+ .await
+ .expect("purging channels always succeeds");
+
+ // Subscribe
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let super::Response(events) = super::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(super::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ // Check for expiry event
+ events
+ .filter_map(fixtures::event::channel)
+ .filter_map(fixtures::event::channel::deleted)
+ .filter(|event| future::ready(event.id == channel.id))
+ .next()
+ .expect_wait("deleted channel events not delivered")
+ .await;
+}