summaryrefslogtreecommitdiff
path: root/src/event/handlers
diff options
context:
space:
mode:
authorOwen Jacobson <owen@grimoire.ca>2025-06-17 02:11:45 -0400
committerOwen Jacobson <owen@grimoire.ca>2025-06-18 18:31:40 -0400
commit4e3d5ccac99b24934c972e088cd7eb02bb95df06 (patch)
treec94f5a42f7e734b81892c1289a1d2b566706ba7c /src/event/handlers
parent5ed96f8e8b9d9f19ee249f5c73a5a21ef6bca09f (diff)
Handlers are _named operations_, which can be exposed via routes.
Each domain module that exposes handlers does so through a `handlers` child module, ideally as a top-level symbol that can be plugged directly into Axum's `MethodRouter`. Modules could make exceptions to this - kill the doctrinaire inside yourself, after all - but none of the API modules that actually exist need such exceptions, and consistency is useful. The related details of request types, URL types, response types, errors, &c &c are then organized into modules under `handlers`, along with their respective tests.
Diffstat (limited to 'src/event/handlers')
-rw-r--r--src/event/handlers/mod.rs3
-rw-r--r--src/event/handlers/stream/mod.rs85
-rw-r--r--src/event/handlers/stream/test/channel.rs273
-rw-r--r--src/event/handlers/stream/test/invite.rs87
-rw-r--r--src/event/handlers/stream/test/message.rs393
-rw-r--r--src/event/handlers/stream/test/mod.rs8
-rw-r--r--src/event/handlers/stream/test/resume.rs227
-rw-r--r--src/event/handlers/stream/test/setup.rs47
-rw-r--r--src/event/handlers/stream/test/token.rs148
9 files changed, 1271 insertions, 0 deletions
diff --git a/src/event/handlers/mod.rs b/src/event/handlers/mod.rs
new file mode 100644
index 0000000..22d988c
--- /dev/null
+++ b/src/event/handlers/mod.rs
@@ -0,0 +1,3 @@
+mod stream;
+
+pub use stream::handler as stream;
diff --git a/src/event/handlers/stream/mod.rs b/src/event/handlers/stream/mod.rs
new file mode 100644
index 0000000..d0d3f08
--- /dev/null
+++ b/src/event/handlers/stream/mod.rs
@@ -0,0 +1,85 @@
+use axum::{
+ extract::State,
+ response::{
+ self, IntoResponse,
+ sse::{self, Sse},
+ },
+};
+use axum_extra::extract::Query;
+use futures::stream::{Stream, StreamExt as _};
+
+use crate::{
+ app::App,
+ error::{Internal, Unauthorized},
+ event::{Event, Heartbeat::Heartbeat, Sequence, Sequenced as _, app, extract::LastEventId},
+ token::{app::ValidateError, extract::Identity},
+};
+
+#[cfg(test)]
+mod test;
+
+pub async fn handler(
+ State(app): State<App>,
+ identity: Identity,
+ last_event_id: Option<LastEventId<Sequence>>,
+ Query(query): Query<QueryParams>,
+) -> Result<Response<impl Stream<Item = Event> + std::fmt::Debug>, Error> {
+ let resume_at = last_event_id.map_or(query.resume_point, LastEventId::into_inner);
+
+ let stream = app.events().subscribe(resume_at).await?;
+ let stream = app.tokens().limit_stream(identity.token, stream).await?;
+
+ Ok(Response(stream))
+}
+
+#[derive(serde::Deserialize)]
+pub struct QueryParams {
+ pub resume_point: Sequence,
+}
+
+#[derive(Debug)]
+pub struct Response<S>(pub S);
+
+impl<S> IntoResponse for Response<S>
+where
+ S: Stream<Item = Event> + Send + 'static,
+{
+ fn into_response(self) -> response::Response {
+ let Self(stream) = self;
+ let stream = stream.map(sse::Event::try_from);
+ let heartbeat = match Heartbeat.try_into().map_err(Internal::from) {
+ Ok(heartbeat) => heartbeat,
+ Err(err) => return err.into_response(),
+ };
+ Sse::new(stream).keep_alive(heartbeat).into_response()
+ }
+}
+
+impl TryFrom<Event> for sse::Event {
+ type Error = serde_json::Error;
+
+ fn try_from(event: Event) -> Result<Self, Self::Error> {
+ let id = serde_json::to_string(&event.sequence())?;
+ let data = serde_json::to_string_pretty(&event)?;
+
+ let event = Self::default().id(id).data(data);
+
+ Ok(event)
+ }
+}
+
+#[derive(Debug, thiserror::Error)]
+#[error(transparent)]
+pub enum Error {
+ Subscribe(#[from] app::Error),
+ Validate(#[from] ValidateError),
+}
+
+impl IntoResponse for Error {
+ fn into_response(self) -> response::Response {
+ match self {
+ Self::Validate(ValidateError::InvalidToken) => Unauthorized.into_response(),
+ other => Internal::from(other).into_response(),
+ }
+ }
+}
diff --git a/src/event/handlers/stream/test/channel.rs b/src/event/handlers/stream/test/channel.rs
new file mode 100644
index 0000000..187c3c3
--- /dev/null
+++ b/src/event/handlers/stream/test/channel.rs
@@ -0,0 +1,273 @@
+use axum::extract::State;
+use axum_extra::extract::Query;
+use futures::{future, stream::StreamExt as _};
+
+use crate::test::fixtures::{self, future::Expect as _};
+
+#[tokio::test]
+async fn creating() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
+
+ // Subscribe
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let super::Response(events) = super::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(super::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ // Create a channel
+
+ let name = fixtures::channel::propose();
+ let channel = app
+ .channels()
+ .create(&name, &fixtures::now())
+ .await
+ .expect("creating a channel succeeds");
+
+ // Verify channel created event
+
+ events
+ .filter_map(fixtures::event::channel)
+ .filter_map(fixtures::event::channel::created)
+ .filter(|event| future::ready(event.channel == channel))
+ .next()
+ .expect_some("channel created event is delivered")
+ .await;
+}
+
+#[tokio::test]
+async fn previously_created() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
+
+ // Create a channel
+
+ let name = fixtures::channel::propose();
+ let channel = app
+ .channels()
+ .create(&name, &fixtures::now())
+ .await
+ .expect("creating a channel succeeds");
+
+ // Subscribe
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let super::Response(events) = super::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(super::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ // Verify channel created event
+
+ let _ = events
+ .filter_map(fixtures::event::channel)
+ .filter_map(fixtures::event::channel::created)
+ .filter(|event| future::ready(event.channel == channel))
+ .next()
+ .expect_some("channel created event is delivered")
+ .await;
+}
+
+#[tokio::test]
+async fn expiring() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::ancient()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
+
+ // Subscribe
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let super::Response(events) = super::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(super::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ // Expire channels
+
+ app.channels()
+ .expire(&fixtures::now())
+ .await
+ .expect("expiring channels always succeeds");
+
+ // Check for expiry event
+ let _ = events
+ .filter_map(fixtures::event::channel)
+ .filter_map(fixtures::event::channel::deleted)
+ .filter(|event| future::ready(event.id == channel.id))
+ .next()
+ .expect_some("a deleted channel event will be delivered")
+ .await;
+}
+
+#[tokio::test]
+async fn previously_expired() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::ancient()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
+
+ // Expire channels
+
+ app.channels()
+ .expire(&fixtures::now())
+ .await
+ .expect("expiring channels always succeeds");
+
+ // Subscribe
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let super::Response(events) = super::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(super::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ // Check for expiry event
+ let _ = events
+ .filter_map(fixtures::event::channel)
+ .filter_map(fixtures::event::channel::deleted)
+ .filter(|event| future::ready(event.id == channel.id))
+ .next()
+ .expect_some("a deleted channel event will be delivered")
+ .await;
+}
+
+#[tokio::test]
+async fn deleting() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
+
+ // Subscribe
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let super::Response(events) = super::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(super::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ // Delete the channel
+
+ app.channels()
+ .delete(&channel.id, &fixtures::now())
+ .await
+ .expect("deleting a valid channel succeeds");
+
+ // Check for delete event
+ let _ = events
+ .filter_map(fixtures::event::channel)
+ .filter_map(fixtures::event::channel::deleted)
+ .filter(|event| future::ready(event.id == channel.id))
+ .next()
+ .expect_some("a deleted channel event will be delivered")
+ .await;
+}
+
+#[tokio::test]
+async fn previously_deleted() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
+
+ // Delete the channel
+
+ app.channels()
+ .delete(&channel.id, &fixtures::now())
+ .await
+ .expect("deleting a valid channel succeeds");
+
+ // Subscribe
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let super::Response(events) = super::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(super::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ // Check for expiry event
+ let _ = events
+ .filter_map(fixtures::event::channel)
+ .filter_map(fixtures::event::channel::deleted)
+ .filter(|event| future::ready(event.id == channel.id))
+ .next()
+ .expect_some("a deleted channel event will be delivered")
+ .await;
+}
+
+#[tokio::test]
+async fn previously_purged() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::ancient()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
+
+ // Delete and purge the channel
+
+ app.channels()
+ .delete(&channel.id, &fixtures::ancient())
+ .await
+ .expect("deleting a valid channel succeeds");
+
+ app.channels()
+ .purge(&fixtures::now())
+ .await
+ .expect("purging channels always succeeds");
+
+ // Subscribe
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let super::Response(events) = super::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(super::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ // Check for expiry event
+ events
+ .filter_map(fixtures::event::channel)
+ .filter_map(fixtures::event::channel::deleted)
+ .filter(|event| future::ready(event.id == channel.id))
+ .next()
+ .expect_wait("deleted channel events not delivered")
+ .await;
+}
diff --git a/src/event/handlers/stream/test/invite.rs b/src/event/handlers/stream/test/invite.rs
new file mode 100644
index 0000000..c8e12fb
--- /dev/null
+++ b/src/event/handlers/stream/test/invite.rs
@@ -0,0 +1,87 @@
+use axum::extract::State;
+use axum_extra::extract::Query;
+use futures::{future, stream::StreamExt as _};
+
+use crate::test::fixtures::{self, future::Expect as _};
+
+#[tokio::test]
+async fn accepting_invite() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let issuer = fixtures::user::create(&app, &fixtures::now()).await;
+ let invite = fixtures::invite::issue(&app, &issuer, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
+
+ // Subscribe
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let super::Response(events) = super::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(super::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ // Accept the invite
+
+ let (name, password) = fixtures::user::propose();
+ let (joiner, _) = app
+ .invites()
+ .accept(&invite.id, &name, &password, &fixtures::now())
+ .await
+ .expect("accepting an invite succeeds");
+
+ // Expect a login created event
+
+ let _ = events
+ .filter_map(fixtures::event::user)
+ .filter_map(fixtures::event::user::created)
+ .filter(|event| future::ready(event.user == joiner))
+ .next()
+ .expect_some("a login created event is sent")
+ .await;
+}
+
+#[tokio::test]
+async fn previously_accepted_invite() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let issuer = fixtures::user::create(&app, &fixtures::now()).await;
+ let invite = fixtures::invite::issue(&app, &issuer, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
+
+ // Accept the invite
+
+ let (name, password) = fixtures::user::propose();
+ let (joiner, _) = app
+ .invites()
+ .accept(&invite.id, &name, &password, &fixtures::now())
+ .await
+ .expect("accepting an invite succeeds");
+
+ // Subscribe
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let super::Response(events) = super::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(super::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ // Expect a login created event
+
+ let _ = events
+ .filter_map(fixtures::event::user)
+ .filter_map(fixtures::event::user::created)
+ .filter(|event| future::ready(event.user == joiner))
+ .next()
+ .expect_some("a login created event is sent")
+ .await;
+}
diff --git a/src/event/handlers/stream/test/message.rs b/src/event/handlers/stream/test/message.rs
new file mode 100644
index 0000000..a80c896
--- /dev/null
+++ b/src/event/handlers/stream/test/message.rs
@@ -0,0 +1,393 @@
+use axum::extract::State;
+use axum_extra::extract::Query;
+use futures::{
+ future,
+ stream::{self, StreamExt as _},
+};
+
+use crate::test::fixtures::{self, future::Expect as _};
+
+#[tokio::test]
+async fn sending() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
+
+ // Call the endpoint
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let super::Response(events) = super::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(super::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ // Send a message
+
+ let sender = fixtures::user::create(&app, &fixtures::now()).await;
+ let message = app
+ .messages()
+ .send(
+ &channel.id,
+ &sender,
+ &fixtures::now(),
+ &fixtures::message::propose(),
+ )
+ .await
+ .expect("sending a message succeeds");
+
+ // Verify that an event is delivered
+
+ let _ = events
+ .filter_map(fixtures::event::message)
+ .filter_map(fixtures::event::message::sent)
+ .filter(|event| future::ready(event.message == message))
+ .next()
+ .expect_some("delivered message sent event")
+ .await;
+}
+
+#[tokio::test]
+async fn previously_sent() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
+
+ // Send a message
+
+ let sender = fixtures::user::create(&app, &fixtures::now()).await;
+ let message = app
+ .messages()
+ .send(
+ &channel.id,
+ &sender,
+ &fixtures::now(),
+ &fixtures::message::propose(),
+ )
+ .await
+ .expect("sending a message succeeds");
+
+ // Call the endpoint
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let super::Response(events) = super::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(super::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ // Verify that an event is delivered
+
+ let _ = events
+ .filter_map(fixtures::event::message)
+ .filter_map(fixtures::event::message::sent)
+ .filter(|event| future::ready(event.message == message))
+ .next()
+ .expect_some("delivered message sent event")
+ .await;
+}
+
+#[tokio::test]
+async fn sent_in_multiple_channels() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let sender = fixtures::user::create(&app, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
+
+ let channels = [
+ fixtures::channel::create(&app, &fixtures::now()).await,
+ fixtures::channel::create(&app, &fixtures::now()).await,
+ ];
+
+ let messages = stream::iter(channels)
+ .then(|channel| {
+ let app = app.clone();
+ let sender = sender.clone();
+ let channel = channel.clone();
+ async move { fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await }
+ })
+ .collect::<Vec<_>>()
+ .await;
+
+ // Call the endpoint
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let super::Response(events) = super::handler(
+ State(app),
+ subscriber,
+ None,
+ Query(super::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ // Verify the structure of the response.
+
+ let events = events
+ .filter_map(fixtures::event::message)
+ .filter_map(fixtures::event::message::sent)
+ .take(messages.len())
+ .collect::<Vec<_>>()
+ .expect_ready("events ready")
+ .await;
+
+ for message in &messages {
+ assert!(events.iter().any(|event| &event.message == message));
+ }
+}
+
+#[tokio::test]
+async fn sent_sequentially() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+ let sender = fixtures::user::create(&app, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
+
+ let messages = vec![
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ ];
+
+ // Subscribe
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let super::Response(events) = super::handler(
+ State(app),
+ subscriber,
+ None,
+ Query(super::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ // Verify the expected events in the expected order
+
+ let mut events = events
+ .filter_map(fixtures::event::message)
+ .filter_map(fixtures::event::message::sent)
+ .filter(|event| future::ready(messages.iter().any(|message| &event.message == message)));
+
+ for message in &messages {
+ let event = events
+ .next()
+ .expect_some("undelivered messages remaining")
+ .await;
+
+ assert_eq!(message, &event.message);
+ }
+}
+
+#[tokio::test]
+async fn expiring() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::ancient()).await;
+ let sender = fixtures::user::create(&app, &fixtures::ancient()).await;
+ let message = fixtures::message::send(&app, &channel, &sender, &fixtures::ancient()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
+
+ // Subscribe
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let super::Response(events) = super::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(super::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ // Expire messages
+
+ app.messages()
+ .expire(&fixtures::now())
+ .await
+ .expect("expiring messages always succeeds");
+
+ // Check for expiry event
+ let _ = events
+ .filter_map(fixtures::event::message)
+ .filter_map(fixtures::event::message::deleted)
+ .filter(|event| future::ready(event.id == message.id))
+ .next()
+ .expect_some("a deleted message event will be delivered")
+ .await;
+}
+
+#[tokio::test]
+async fn previously_expired() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::ancient()).await;
+ let sender = fixtures::user::create(&app, &fixtures::ancient()).await;
+ let message = fixtures::message::send(&app, &channel, &sender, &fixtures::ancient()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
+
+ // Expire messages
+
+ app.messages()
+ .expire(&fixtures::now())
+ .await
+ .expect("expiring messages always succeeds");
+
+ // Subscribe
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let super::Response(events) = super::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(super::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ // Check for expiry event
+ let _ = events
+ .filter_map(fixtures::event::message)
+ .filter_map(fixtures::event::message::deleted)
+ .filter(|event| future::ready(event.id == message.id))
+ .next()
+ .expect_some("a deleted message event will be delivered")
+ .await;
+}
+
+#[tokio::test]
+async fn deleting() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+ let sender = fixtures::user::create(&app, &fixtures::now()).await;
+ let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
+
+ // Subscribe
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let super::Response(events) = super::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(super::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ // Delete the message
+
+ app.messages()
+ .delete(&sender, &message.id, &fixtures::now())
+ .await
+ .expect("deleting a valid message succeeds");
+
+ // Check for delete event
+ let _ = events
+ .filter_map(fixtures::event::message)
+ .filter_map(fixtures::event::message::deleted)
+ .filter(|event| future::ready(event.id == message.id))
+ .next()
+ .expect_some("a deleted message event will be delivered")
+ .await;
+}
+
+#[tokio::test]
+async fn previously_deleted() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+ let sender = fixtures::user::create(&app, &fixtures::now()).await;
+ let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
+
+ // Delete the message
+
+ app.messages()
+ .delete(&sender, &message.id, &fixtures::now())
+ .await
+ .expect("deleting a valid message succeeds");
+
+ // Subscribe
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let super::Response(events) = super::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(super::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ // Check for delete event
+ let _ = events
+ .filter_map(fixtures::event::message)
+ .filter_map(fixtures::event::message::deleted)
+ .filter(|event| future::ready(event.id == message.id))
+ .next()
+ .expect_some("a deleted message event will be delivered")
+ .await;
+}
+
+#[tokio::test]
+async fn previously_purged() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::ancient()).await;
+ let sender = fixtures::user::create(&app, &fixtures::ancient()).await;
+ let message = fixtures::message::send(&app, &channel, &sender, &fixtures::ancient()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
+
+ // Purge the message
+
+ app.messages()
+ .delete(&sender, &message.id, &fixtures::ancient())
+ .await
+ .expect("deleting a valid message succeeds");
+
+ app.messages()
+ .purge(&fixtures::now())
+ .await
+ .expect("purge always succeeds");
+
+ // Subscribe
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let super::Response(events) = super::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(super::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ // Check for delete event
+
+ events
+ .filter_map(fixtures::event::message)
+ .filter_map(fixtures::event::message::deleted)
+ .filter(|event| future::ready(event.id == message.id))
+ .next()
+ .expect_wait("no deleted message will be delivered")
+ .await;
+}
diff --git a/src/event/handlers/stream/test/mod.rs b/src/event/handlers/stream/test/mod.rs
new file mode 100644
index 0000000..df43deb
--- /dev/null
+++ b/src/event/handlers/stream/test/mod.rs
@@ -0,0 +1,8 @@
+mod channel;
+mod invite;
+mod message;
+mod resume;
+mod setup;
+mod token;
+
+use super::{QueryParams, Response, handler};
diff --git a/src/event/handlers/stream/test/resume.rs b/src/event/handlers/stream/test/resume.rs
new file mode 100644
index 0000000..34fee4d
--- /dev/null
+++ b/src/event/handlers/stream/test/resume.rs
@@ -0,0 +1,227 @@
+use std::future;
+
+use axum::extract::State;
+use axum_extra::extract::Query;
+use futures::stream::{self, StreamExt as _};
+
+use crate::{
+ event::Sequenced as _,
+ test::fixtures::{self, future::Expect as _},
+};
+
+#[tokio::test]
+async fn resume() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+ let sender = fixtures::user::create(&app, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
+
+ let initial_message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await;
+
+ let later_messages = vec![
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ ];
+
+ // Call the endpoint
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+
+ let resume_at = {
+ // First subscription
+ let super::Response(events) = super::handler(
+ State(app.clone()),
+ subscriber.clone(),
+ None,
+ Query(super::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ let event = events
+ .filter_map(fixtures::event::message)
+ .filter_map(fixtures::event::message::sent)
+ .filter(|event| future::ready(event.message == initial_message))
+ .next()
+ .expect_some("delivered event for initial message")
+ .await;
+
+ event.sequence()
+ };
+
+ // Resume after disconnect
+ let super::Response(resumed) = super::handler(
+ State(app),
+ subscriber,
+ Some(resume_at.into()),
+ Query(super::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ // Verify final events
+
+ let mut events = resumed
+ .filter_map(fixtures::event::message)
+ .filter_map(fixtures::event::message::sent)
+ .zip(stream::iter(later_messages));
+
+ while let Some((event, message)) = events.next().expect_ready("event ready").await {
+ assert_eq!(message, event.message);
+ }
+}
+
+// This test verifies a real bug I hit developing the vector-of-sequences
+// approach to resuming events. A small omission caused the event IDs in a
+// resumed stream to _omit_ channels that were in the original stream until
+// those channels also appeared in the resumed stream.
+//
+// Clients would see something like
+// * In the original stream, Cfoo=5,Cbar=8
+// * In the resumed stream, Cfoo=6 (no Cbar sequence number)
+//
+// Disconnecting and reconnecting a second time, using event IDs from that
+// initial period of the first resume attempt, would then cause the second
+// resume attempt to restart all other channels from the beginning, and not
+// from where the first disconnection happened.
+//
+// As we have switched to a single global event sequence number, this scenario
+// can no longer arise, but this test is preserved because the actual behaviour
+// _is_ a valid way for clients to behave, and should work. We might as well
+// keep testing it.
+#[tokio::test]
+async fn serial_resume() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let sender = fixtures::user::create(&app, &fixtures::now()).await;
+ let channel_a = fixtures::channel::create(&app, &fixtures::now()).await;
+ let channel_b = fixtures::channel::create(&app, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
+
+ // Call the endpoint
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+
+ let resume_at = {
+ let initial_messages = [
+ fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel_b, &sender, &fixtures::now()).await,
+ ];
+
+ // First subscription
+
+ let super::Response(events) = super::handler(
+ State(app.clone()),
+ subscriber.clone(),
+ None,
+ Query(super::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ // Check for expected events
+
+ let events = events
+ .filter_map(fixtures::event::message)
+ .filter_map(fixtures::event::message::sent)
+ .zip(stream::iter(initial_messages))
+ .collect::<Vec<_>>()
+ .expect_ready("zipping a finite list of events is ready immediately")
+ .await;
+
+ assert!(
+ events
+ .iter()
+ .all(|(event, message)| message == &event.message)
+ );
+
+ let (event, _) = events.last().expect("this vec is non-empty");
+
+ // Take the last one's resume point
+
+ event.sequence()
+ };
+
+ // Resume after disconnect
+ let resume_at = {
+ let resume_messages = [
+ // Note that channel_b does not appear here. The buggy behaviour
+ // would be masked if channel_b happened to send a new message
+ // into the resumed event stream.
+ fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await,
+ ];
+
+ // Second subscription
+ let super::Response(events) = super::handler(
+ State(app.clone()),
+ subscriber.clone(),
+ Some(resume_at.into()),
+ Query(super::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ // Check for expected events
+
+ let events = events
+ .filter_map(fixtures::event::message)
+ .filter_map(fixtures::event::message::sent)
+ .zip(stream::iter(resume_messages))
+ .collect::<Vec<_>>()
+ .expect_ready("zipping a finite list of events is ready immediately")
+ .await;
+
+ assert!(
+ events
+ .iter()
+ .all(|(event, message)| message == &event.message)
+ );
+
+ let (event, _) = events.last().expect("this vec is non-empty");
+
+ // Take the last one's resume point
+
+ event.sequence()
+ };
+
+ // Resume after disconnect a second time
+ {
+ // At this point, we can send on either channel and demonstrate the
+ // problem. The resume point should before both of these messages, but
+ // after _all_ prior messages.
+ let final_messages = [
+ fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel_b, &sender, &fixtures::now()).await,
+ ];
+
+ // Third subscription
+ let super::Response(events) = super::handler(
+ State(app.clone()),
+ subscriber.clone(),
+ Some(resume_at.into()),
+ Query(super::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ // Check for expected events
+
+ let events = events
+ .filter_map(fixtures::event::message)
+ .filter_map(fixtures::event::message::sent)
+ .zip(stream::iter(final_messages))
+ .collect::<Vec<_>>()
+ .expect_ready("zipping a finite list of events is ready immediately")
+ .await;
+
+ assert!(
+ events
+ .iter()
+ .all(|(event, message)| message == &event.message)
+ );
+ };
+}
diff --git a/src/event/handlers/stream/test/setup.rs b/src/event/handlers/stream/test/setup.rs
new file mode 100644
index 0000000..5335055
--- /dev/null
+++ b/src/event/handlers/stream/test/setup.rs
@@ -0,0 +1,47 @@
+use axum::extract::State;
+use axum_extra::extract::Query;
+use futures::{future, stream::StreamExt as _};
+
+use crate::test::fixtures::{self, future::Expect as _};
+
+// There's no test for this in subscribe-then-setup order because creating an
+// identity to subscribe with also completes initial setup, preventing the
+// test from running. That is also a can't-happen scenario in reality.
+#[tokio::test]
+async fn previously_completed() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
+
+ // Complete initial setup
+
+ let (name, password) = fixtures::user::propose();
+ let (owner, _) = app
+ .setup()
+ .initial(&name, &password, &fixtures::now())
+ .await
+ .expect("initial setup in an empty app succeeds");
+
+ // Subscribe to events
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let super::Response(events) = super::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(super::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ // Expect a login created event
+
+ let _ = events
+ .filter_map(fixtures::event::user)
+ .filter_map(fixtures::event::user::created)
+ .filter(|event| future::ready(event.user == owner))
+ .next()
+ .expect_some("a login created event is sent")
+ .await;
+}
diff --git a/src/event/handlers/stream/test/token.rs b/src/event/handlers/stream/test/token.rs
new file mode 100644
index 0000000..2008323
--- /dev/null
+++ b/src/event/handlers/stream/test/token.rs
@@ -0,0 +1,148 @@
+use axum::extract::State;
+use axum_extra::extract::Query;
+use futures::{future, stream::StreamExt as _};
+
+use crate::test::fixtures::{self, future::Expect as _};
+
+#[tokio::test]
+async fn terminates_on_token_expiry() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+ let sender = fixtures::user::create(&app, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
+
+ // Subscribe via the endpoint
+
+ let subscriber_creds = fixtures::user::create_with_password(&app, &fixtures::now()).await;
+ let subscriber =
+ fixtures::identity::logged_in(&app, &subscriber_creds, &fixtures::ancient()).await;
+
+ let super::Response(events) = super::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(super::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ // Verify the resulting stream's behaviour
+
+ app.tokens()
+ .expire(&fixtures::now())
+ .await
+ .expect("expiring tokens succeeds");
+
+ // These should not be delivered.
+ let messages = [
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ ];
+
+ events
+ .filter_map(fixtures::event::message)
+ .filter_map(fixtures::event::message::sent)
+ .filter(|event| future::ready(messages.iter().any(|message| &event.message == message)))
+ .next()
+ .expect_none("end of stream")
+ .await;
+}
+
+#[tokio::test]
+async fn terminates_on_logout() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+ let sender = fixtures::user::create(&app, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
+
+ // Subscribe via the endpoint
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+
+ let super::Response(events) = super::handler(
+ State(app.clone()),
+ subscriber.clone(),
+ None,
+ Query(super::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ // Verify the resulting stream's behaviour
+
+ app.tokens()
+ .logout(&subscriber.token)
+ .await
+ .expect("expiring tokens succeeds");
+
+ // These should not be delivered.
+
+ let messages = [
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ ];
+
+ events
+ .filter_map(fixtures::event::message)
+ .filter_map(fixtures::event::message::sent)
+ .filter(|event| future::ready(messages.iter().any(|message| &event.message == message)))
+ .next()
+ .expect_none("end of stream")
+ .await;
+}
+
+#[tokio::test]
+async fn terminates_on_password_change() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+ let sender = fixtures::user::create(&app, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
+
+ // Subscribe via the endpoint
+
+ let creds = fixtures::user::create_with_password(&app, &fixtures::now()).await;
+ let cookie = fixtures::cookie::logged_in(&app, &creds, &fixtures::now()).await;
+ let subscriber = fixtures::identity::from_cookie(&app, &cookie, &fixtures::now()).await;
+
+ let super::Response(events) = super::handler(
+ State(app.clone()),
+ subscriber.clone(),
+ None,
+ Query(super::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ // Verify the resulting stream's behaviour
+
+ let (_, password) = creds;
+ let to = fixtures::user::propose_password();
+ app.tokens()
+ .change_password(&subscriber.user, &password, &to, &fixtures::now())
+ .await
+ .expect("expiring tokens succeeds");
+
+ // These should not be delivered.
+
+ let messages = [
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ ];
+
+ events
+ .filter_map(fixtures::event::message)
+ .filter_map(fixtures::event::message::sent)
+ .filter(|event| future::ready(messages.iter().any(|message| &event.message == message)))
+ .next()
+ .expect_none("end of stream")
+ .await;
+}