From 4e3d5ccac99b24934c972e088cd7eb02bb95df06 Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Tue, 17 Jun 2025 02:11:45 -0400 Subject: Handlers are _named operations_, which can be exposed via routes. Each domain module that exposes handlers does so through a `handlers` child module, ideally as a top-level symbol that can be plugged directly into Axum's `MethodRouter`. Modules could make exceptions to this - kill the doctrinaire inside yourself, after all - but none of the API modules that actually exist need such exceptions, and consistency is useful. The related details of request types, URL types, response types, errors, &c &c are then organized into modules under `handlers`, along with their respective tests. --- src/event/handlers/mod.rs | 3 + src/event/handlers/stream/mod.rs | 85 +++++++ src/event/handlers/stream/test/channel.rs | 273 ++++++++++++++++++++ src/event/handlers/stream/test/invite.rs | 87 +++++++ src/event/handlers/stream/test/message.rs | 393 +++++++++++++++++++++++++++++ src/event/handlers/stream/test/mod.rs | 8 + src/event/handlers/stream/test/resume.rs | 227 +++++++++++++++++ src/event/handlers/stream/test/setup.rs | 47 ++++ src/event/handlers/stream/test/token.rs | 148 +++++++++++ src/event/mod.rs | 2 +- src/event/routes/get.rs | 82 ------- src/event/routes/mod.rs | 3 - src/event/routes/test/channel.rs | 276 --------------------- src/event/routes/test/invite.rs | 90 ------- src/event/routes/test/message.rs | 396 ------------------------------ src/event/routes/test/mod.rs | 6 - src/event/routes/test/resume.rs | 227 ----------------- src/event/routes/test/setup.rs | 50 ---- src/event/routes/test/token.rs | 151 ------------ 19 files changed, 1272 insertions(+), 1282 deletions(-) create mode 100644 src/event/handlers/mod.rs create mode 100644 src/event/handlers/stream/mod.rs create mode 100644 src/event/handlers/stream/test/channel.rs create mode 100644 src/event/handlers/stream/test/invite.rs create mode 100644 src/event/handlers/stream/test/message.rs create mode 100644 src/event/handlers/stream/test/mod.rs create mode 100644 src/event/handlers/stream/test/resume.rs create mode 100644 src/event/handlers/stream/test/setup.rs create mode 100644 src/event/handlers/stream/test/token.rs delete mode 100644 src/event/routes/get.rs delete mode 100644 src/event/routes/mod.rs delete mode 100644 src/event/routes/test/channel.rs delete mode 100644 src/event/routes/test/invite.rs delete mode 100644 src/event/routes/test/message.rs delete mode 100644 src/event/routes/test/mod.rs delete mode 100644 src/event/routes/test/resume.rs delete mode 100644 src/event/routes/test/setup.rs delete mode 100644 src/event/routes/test/token.rs (limited to 'src/event') diff --git a/src/event/handlers/mod.rs b/src/event/handlers/mod.rs new file mode 100644 index 0000000..22d988c --- /dev/null +++ b/src/event/handlers/mod.rs @@ -0,0 +1,3 @@ +mod stream; + +pub use stream::handler as stream; diff --git a/src/event/handlers/stream/mod.rs b/src/event/handlers/stream/mod.rs new file mode 100644 index 0000000..d0d3f08 --- /dev/null +++ b/src/event/handlers/stream/mod.rs @@ -0,0 +1,85 @@ +use axum::{ + extract::State, + response::{ + self, IntoResponse, + sse::{self, Sse}, + }, +}; +use axum_extra::extract::Query; +use futures::stream::{Stream, StreamExt as _}; + +use crate::{ + app::App, + error::{Internal, Unauthorized}, + event::{Event, Heartbeat::Heartbeat, Sequence, Sequenced as _, app, extract::LastEventId}, + token::{app::ValidateError, extract::Identity}, +}; + +#[cfg(test)] +mod test; + +pub async fn handler( + State(app): State, + identity: Identity, + last_event_id: Option>, + Query(query): Query, +) -> Result + std::fmt::Debug>, Error> { + let resume_at = last_event_id.map_or(query.resume_point, LastEventId::into_inner); + + let stream = app.events().subscribe(resume_at).await?; + let stream = app.tokens().limit_stream(identity.token, stream).await?; + + Ok(Response(stream)) +} + +#[derive(serde::Deserialize)] +pub struct QueryParams { + pub resume_point: Sequence, +} + +#[derive(Debug)] +pub struct Response(pub S); + +impl IntoResponse for Response +where + S: Stream + Send + 'static, +{ + fn into_response(self) -> response::Response { + let Self(stream) = self; + let stream = stream.map(sse::Event::try_from); + let heartbeat = match Heartbeat.try_into().map_err(Internal::from) { + Ok(heartbeat) => heartbeat, + Err(err) => return err.into_response(), + }; + Sse::new(stream).keep_alive(heartbeat).into_response() + } +} + +impl TryFrom for sse::Event { + type Error = serde_json::Error; + + fn try_from(event: Event) -> Result { + let id = serde_json::to_string(&event.sequence())?; + let data = serde_json::to_string_pretty(&event)?; + + let event = Self::default().id(id).data(data); + + Ok(event) + } +} + +#[derive(Debug, thiserror::Error)] +#[error(transparent)] +pub enum Error { + Subscribe(#[from] app::Error), + Validate(#[from] ValidateError), +} + +impl IntoResponse for Error { + fn into_response(self) -> response::Response { + match self { + Self::Validate(ValidateError::InvalidToken) => Unauthorized.into_response(), + other => Internal::from(other).into_response(), + } + } +} diff --git a/src/event/handlers/stream/test/channel.rs b/src/event/handlers/stream/test/channel.rs new file mode 100644 index 0000000..187c3c3 --- /dev/null +++ b/src/event/handlers/stream/test/channel.rs @@ -0,0 +1,273 @@ +use axum::extract::State; +use axum_extra::extract::Query; +use futures::{future, stream::StreamExt as _}; + +use crate::test::fixtures::{self, future::Expect as _}; + +#[tokio::test] +async fn creating() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let resume_point = fixtures::boot::resume_point(&app).await; + + // Subscribe + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let super::Response(events) = super::handler( + State(app.clone()), + subscriber, + None, + Query(super::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); + + // Create a channel + + let name = fixtures::channel::propose(); + let channel = app + .channels() + .create(&name, &fixtures::now()) + .await + .expect("creating a channel succeeds"); + + // Verify channel created event + + events + .filter_map(fixtures::event::channel) + .filter_map(fixtures::event::channel::created) + .filter(|event| future::ready(event.channel == channel)) + .next() + .expect_some("channel created event is delivered") + .await; +} + +#[tokio::test] +async fn previously_created() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let resume_point = fixtures::boot::resume_point(&app).await; + + // Create a channel + + let name = fixtures::channel::propose(); + let channel = app + .channels() + .create(&name, &fixtures::now()) + .await + .expect("creating a channel succeeds"); + + // Subscribe + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let super::Response(events) = super::handler( + State(app.clone()), + subscriber, + None, + Query(super::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); + + // Verify channel created event + + let _ = events + .filter_map(fixtures::event::channel) + .filter_map(fixtures::event::channel::created) + .filter(|event| future::ready(event.channel == channel)) + .next() + .expect_some("channel created event is delivered") + .await; +} + +#[tokio::test] +async fn expiring() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; + let resume_point = fixtures::boot::resume_point(&app).await; + + // Subscribe + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let super::Response(events) = super::handler( + State(app.clone()), + subscriber, + None, + Query(super::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); + + // Expire channels + + app.channels() + .expire(&fixtures::now()) + .await + .expect("expiring channels always succeeds"); + + // Check for expiry event + let _ = events + .filter_map(fixtures::event::channel) + .filter_map(fixtures::event::channel::deleted) + .filter(|event| future::ready(event.id == channel.id)) + .next() + .expect_some("a deleted channel event will be delivered") + .await; +} + +#[tokio::test] +async fn previously_expired() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; + let resume_point = fixtures::boot::resume_point(&app).await; + + // Expire channels + + app.channels() + .expire(&fixtures::now()) + .await + .expect("expiring channels always succeeds"); + + // Subscribe + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let super::Response(events) = super::handler( + State(app.clone()), + subscriber, + None, + Query(super::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); + + // Check for expiry event + let _ = events + .filter_map(fixtures::event::channel) + .filter_map(fixtures::event::channel::deleted) + .filter(|event| future::ready(event.id == channel.id)) + .next() + .expect_some("a deleted channel event will be delivered") + .await; +} + +#[tokio::test] +async fn deleting() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; + + // Subscribe + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let super::Response(events) = super::handler( + State(app.clone()), + subscriber, + None, + Query(super::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); + + // Delete the channel + + app.channels() + .delete(&channel.id, &fixtures::now()) + .await + .expect("deleting a valid channel succeeds"); + + // Check for delete event + let _ = events + .filter_map(fixtures::event::channel) + .filter_map(fixtures::event::channel::deleted) + .filter(|event| future::ready(event.id == channel.id)) + .next() + .expect_some("a deleted channel event will be delivered") + .await; +} + +#[tokio::test] +async fn previously_deleted() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; + + // Delete the channel + + app.channels() + .delete(&channel.id, &fixtures::now()) + .await + .expect("deleting a valid channel succeeds"); + + // Subscribe + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let super::Response(events) = super::handler( + State(app.clone()), + subscriber, + None, + Query(super::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); + + // Check for expiry event + let _ = events + .filter_map(fixtures::event::channel) + .filter_map(fixtures::event::channel::deleted) + .filter(|event| future::ready(event.id == channel.id)) + .next() + .expect_some("a deleted channel event will be delivered") + .await; +} + +#[tokio::test] +async fn previously_purged() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; + let resume_point = fixtures::boot::resume_point(&app).await; + + // Delete and purge the channel + + app.channels() + .delete(&channel.id, &fixtures::ancient()) + .await + .expect("deleting a valid channel succeeds"); + + app.channels() + .purge(&fixtures::now()) + .await + .expect("purging channels always succeeds"); + + // Subscribe + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let super::Response(events) = super::handler( + State(app.clone()), + subscriber, + None, + Query(super::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); + + // Check for expiry event + events + .filter_map(fixtures::event::channel) + .filter_map(fixtures::event::channel::deleted) + .filter(|event| future::ready(event.id == channel.id)) + .next() + .expect_wait("deleted channel events not delivered") + .await; +} diff --git a/src/event/handlers/stream/test/invite.rs b/src/event/handlers/stream/test/invite.rs new file mode 100644 index 0000000..c8e12fb --- /dev/null +++ b/src/event/handlers/stream/test/invite.rs @@ -0,0 +1,87 @@ +use axum::extract::State; +use axum_extra::extract::Query; +use futures::{future, stream::StreamExt as _}; + +use crate::test::fixtures::{self, future::Expect as _}; + +#[tokio::test] +async fn accepting_invite() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let issuer = fixtures::user::create(&app, &fixtures::now()).await; + let invite = fixtures::invite::issue(&app, &issuer, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; + + // Subscribe + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let super::Response(events) = super::handler( + State(app.clone()), + subscriber, + None, + Query(super::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); + + // Accept the invite + + let (name, password) = fixtures::user::propose(); + let (joiner, _) = app + .invites() + .accept(&invite.id, &name, &password, &fixtures::now()) + .await + .expect("accepting an invite succeeds"); + + // Expect a login created event + + let _ = events + .filter_map(fixtures::event::user) + .filter_map(fixtures::event::user::created) + .filter(|event| future::ready(event.user == joiner)) + .next() + .expect_some("a login created event is sent") + .await; +} + +#[tokio::test] +async fn previously_accepted_invite() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let issuer = fixtures::user::create(&app, &fixtures::now()).await; + let invite = fixtures::invite::issue(&app, &issuer, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; + + // Accept the invite + + let (name, password) = fixtures::user::propose(); + let (joiner, _) = app + .invites() + .accept(&invite.id, &name, &password, &fixtures::now()) + .await + .expect("accepting an invite succeeds"); + + // Subscribe + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let super::Response(events) = super::handler( + State(app.clone()), + subscriber, + None, + Query(super::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); + + // Expect a login created event + + let _ = events + .filter_map(fixtures::event::user) + .filter_map(fixtures::event::user::created) + .filter(|event| future::ready(event.user == joiner)) + .next() + .expect_some("a login created event is sent") + .await; +} diff --git a/src/event/handlers/stream/test/message.rs b/src/event/handlers/stream/test/message.rs new file mode 100644 index 0000000..a80c896 --- /dev/null +++ b/src/event/handlers/stream/test/message.rs @@ -0,0 +1,393 @@ +use axum::extract::State; +use axum_extra::extract::Query; +use futures::{ + future, + stream::{self, StreamExt as _}, +}; + +use crate::test::fixtures::{self, future::Expect as _}; + +#[tokio::test] +async fn sending() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; + + // Call the endpoint + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let super::Response(events) = super::handler( + State(app.clone()), + subscriber, + None, + Query(super::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); + + // Send a message + + let sender = fixtures::user::create(&app, &fixtures::now()).await; + let message = app + .messages() + .send( + &channel.id, + &sender, + &fixtures::now(), + &fixtures::message::propose(), + ) + .await + .expect("sending a message succeeds"); + + // Verify that an event is delivered + + let _ = events + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::sent) + .filter(|event| future::ready(event.message == message)) + .next() + .expect_some("delivered message sent event") + .await; +} + +#[tokio::test] +async fn previously_sent() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; + + // Send a message + + let sender = fixtures::user::create(&app, &fixtures::now()).await; + let message = app + .messages() + .send( + &channel.id, + &sender, + &fixtures::now(), + &fixtures::message::propose(), + ) + .await + .expect("sending a message succeeds"); + + // Call the endpoint + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let super::Response(events) = super::handler( + State(app.clone()), + subscriber, + None, + Query(super::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); + + // Verify that an event is delivered + + let _ = events + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::sent) + .filter(|event| future::ready(event.message == message)) + .next() + .expect_some("delivered message sent event") + .await; +} + +#[tokio::test] +async fn sent_in_multiple_channels() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let sender = fixtures::user::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; + + let channels = [ + fixtures::channel::create(&app, &fixtures::now()).await, + fixtures::channel::create(&app, &fixtures::now()).await, + ]; + + let messages = stream::iter(channels) + .then(|channel| { + let app = app.clone(); + let sender = sender.clone(); + let channel = channel.clone(); + async move { fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await } + }) + .collect::>() + .await; + + // Call the endpoint + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let super::Response(events) = super::handler( + State(app), + subscriber, + None, + Query(super::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); + + // Verify the structure of the response. + + let events = events + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::sent) + .take(messages.len()) + .collect::>() + .expect_ready("events ready") + .await; + + for message in &messages { + assert!(events.iter().any(|event| &event.message == message)); + } +} + +#[tokio::test] +async fn sent_sequentially() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let sender = fixtures::user::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; + + let messages = vec![ + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + ]; + + // Subscribe + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let super::Response(events) = super::handler( + State(app), + subscriber, + None, + Query(super::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); + + // Verify the expected events in the expected order + + let mut events = events + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::sent) + .filter(|event| future::ready(messages.iter().any(|message| &event.message == message))); + + for message in &messages { + let event = events + .next() + .expect_some("undelivered messages remaining") + .await; + + assert_eq!(message, &event.message); + } +} + +#[tokio::test] +async fn expiring() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; + let sender = fixtures::user::create(&app, &fixtures::ancient()).await; + let message = fixtures::message::send(&app, &channel, &sender, &fixtures::ancient()).await; + let resume_point = fixtures::boot::resume_point(&app).await; + + // Subscribe + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let super::Response(events) = super::handler( + State(app.clone()), + subscriber, + None, + Query(super::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); + + // Expire messages + + app.messages() + .expire(&fixtures::now()) + .await + .expect("expiring messages always succeeds"); + + // Check for expiry event + let _ = events + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::deleted) + .filter(|event| future::ready(event.id == message.id)) + .next() + .expect_some("a deleted message event will be delivered") + .await; +} + +#[tokio::test] +async fn previously_expired() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; + let sender = fixtures::user::create(&app, &fixtures::ancient()).await; + let message = fixtures::message::send(&app, &channel, &sender, &fixtures::ancient()).await; + let resume_point = fixtures::boot::resume_point(&app).await; + + // Expire messages + + app.messages() + .expire(&fixtures::now()) + .await + .expect("expiring messages always succeeds"); + + // Subscribe + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let super::Response(events) = super::handler( + State(app.clone()), + subscriber, + None, + Query(super::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); + + // Check for expiry event + let _ = events + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::deleted) + .filter(|event| future::ready(event.id == message.id)) + .next() + .expect_some("a deleted message event will be delivered") + .await; +} + +#[tokio::test] +async fn deleting() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let sender = fixtures::user::create(&app, &fixtures::now()).await; + let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; + + // Subscribe + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let super::Response(events) = super::handler( + State(app.clone()), + subscriber, + None, + Query(super::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); + + // Delete the message + + app.messages() + .delete(&sender, &message.id, &fixtures::now()) + .await + .expect("deleting a valid message succeeds"); + + // Check for delete event + let _ = events + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::deleted) + .filter(|event| future::ready(event.id == message.id)) + .next() + .expect_some("a deleted message event will be delivered") + .await; +} + +#[tokio::test] +async fn previously_deleted() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let sender = fixtures::user::create(&app, &fixtures::now()).await; + let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; + + // Delete the message + + app.messages() + .delete(&sender, &message.id, &fixtures::now()) + .await + .expect("deleting a valid message succeeds"); + + // Subscribe + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let super::Response(events) = super::handler( + State(app.clone()), + subscriber, + None, + Query(super::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); + + // Check for delete event + let _ = events + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::deleted) + .filter(|event| future::ready(event.id == message.id)) + .next() + .expect_some("a deleted message event will be delivered") + .await; +} + +#[tokio::test] +async fn previously_purged() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; + let sender = fixtures::user::create(&app, &fixtures::ancient()).await; + let message = fixtures::message::send(&app, &channel, &sender, &fixtures::ancient()).await; + let resume_point = fixtures::boot::resume_point(&app).await; + + // Purge the message + + app.messages() + .delete(&sender, &message.id, &fixtures::ancient()) + .await + .expect("deleting a valid message succeeds"); + + app.messages() + .purge(&fixtures::now()) + .await + .expect("purge always succeeds"); + + // Subscribe + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let super::Response(events) = super::handler( + State(app.clone()), + subscriber, + None, + Query(super::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); + + // Check for delete event + + events + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::deleted) + .filter(|event| future::ready(event.id == message.id)) + .next() + .expect_wait("no deleted message will be delivered") + .await; +} diff --git a/src/event/handlers/stream/test/mod.rs b/src/event/handlers/stream/test/mod.rs new file mode 100644 index 0000000..df43deb --- /dev/null +++ b/src/event/handlers/stream/test/mod.rs @@ -0,0 +1,8 @@ +mod channel; +mod invite; +mod message; +mod resume; +mod setup; +mod token; + +use super::{QueryParams, Response, handler}; diff --git a/src/event/handlers/stream/test/resume.rs b/src/event/handlers/stream/test/resume.rs new file mode 100644 index 0000000..34fee4d --- /dev/null +++ b/src/event/handlers/stream/test/resume.rs @@ -0,0 +1,227 @@ +use std::future; + +use axum::extract::State; +use axum_extra::extract::Query; +use futures::stream::{self, StreamExt as _}; + +use crate::{ + event::Sequenced as _, + test::fixtures::{self, future::Expect as _}, +}; + +#[tokio::test] +async fn resume() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let sender = fixtures::user::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; + + let initial_message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; + + let later_messages = vec![ + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + ]; + + // Call the endpoint + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + + let resume_at = { + // First subscription + let super::Response(events) = super::handler( + State(app.clone()), + subscriber.clone(), + None, + Query(super::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); + + let event = events + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::sent) + .filter(|event| future::ready(event.message == initial_message)) + .next() + .expect_some("delivered event for initial message") + .await; + + event.sequence() + }; + + // Resume after disconnect + let super::Response(resumed) = super::handler( + State(app), + subscriber, + Some(resume_at.into()), + Query(super::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); + + // Verify final events + + let mut events = resumed + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::sent) + .zip(stream::iter(later_messages)); + + while let Some((event, message)) = events.next().expect_ready("event ready").await { + assert_eq!(message, event.message); + } +} + +// This test verifies a real bug I hit developing the vector-of-sequences +// approach to resuming events. A small omission caused the event IDs in a +// resumed stream to _omit_ channels that were in the original stream until +// those channels also appeared in the resumed stream. +// +// Clients would see something like +// * In the original stream, Cfoo=5,Cbar=8 +// * In the resumed stream, Cfoo=6 (no Cbar sequence number) +// +// Disconnecting and reconnecting a second time, using event IDs from that +// initial period of the first resume attempt, would then cause the second +// resume attempt to restart all other channels from the beginning, and not +// from where the first disconnection happened. +// +// As we have switched to a single global event sequence number, this scenario +// can no longer arise, but this test is preserved because the actual behaviour +// _is_ a valid way for clients to behave, and should work. We might as well +// keep testing it. +#[tokio::test] +async fn serial_resume() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let sender = fixtures::user::create(&app, &fixtures::now()).await; + let channel_a = fixtures::channel::create(&app, &fixtures::now()).await; + let channel_b = fixtures::channel::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; + + // Call the endpoint + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + + let resume_at = { + let initial_messages = [ + fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel_b, &sender, &fixtures::now()).await, + ]; + + // First subscription + + let super::Response(events) = super::handler( + State(app.clone()), + subscriber.clone(), + None, + Query(super::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); + + // Check for expected events + + let events = events + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::sent) + .zip(stream::iter(initial_messages)) + .collect::>() + .expect_ready("zipping a finite list of events is ready immediately") + .await; + + assert!( + events + .iter() + .all(|(event, message)| message == &event.message) + ); + + let (event, _) = events.last().expect("this vec is non-empty"); + + // Take the last one's resume point + + event.sequence() + }; + + // Resume after disconnect + let resume_at = { + let resume_messages = [ + // Note that channel_b does not appear here. The buggy behaviour + // would be masked if channel_b happened to send a new message + // into the resumed event stream. + fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await, + ]; + + // Second subscription + let super::Response(events) = super::handler( + State(app.clone()), + subscriber.clone(), + Some(resume_at.into()), + Query(super::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); + + // Check for expected events + + let events = events + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::sent) + .zip(stream::iter(resume_messages)) + .collect::>() + .expect_ready("zipping a finite list of events is ready immediately") + .await; + + assert!( + events + .iter() + .all(|(event, message)| message == &event.message) + ); + + let (event, _) = events.last().expect("this vec is non-empty"); + + // Take the last one's resume point + + event.sequence() + }; + + // Resume after disconnect a second time + { + // At this point, we can send on either channel and demonstrate the + // problem. The resume point should before both of these messages, but + // after _all_ prior messages. + let final_messages = [ + fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel_b, &sender, &fixtures::now()).await, + ]; + + // Third subscription + let super::Response(events) = super::handler( + State(app.clone()), + subscriber.clone(), + Some(resume_at.into()), + Query(super::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); + + // Check for expected events + + let events = events + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::sent) + .zip(stream::iter(final_messages)) + .collect::>() + .expect_ready("zipping a finite list of events is ready immediately") + .await; + + assert!( + events + .iter() + .all(|(event, message)| message == &event.message) + ); + }; +} diff --git a/src/event/handlers/stream/test/setup.rs b/src/event/handlers/stream/test/setup.rs new file mode 100644 index 0000000..5335055 --- /dev/null +++ b/src/event/handlers/stream/test/setup.rs @@ -0,0 +1,47 @@ +use axum::extract::State; +use axum_extra::extract::Query; +use futures::{future, stream::StreamExt as _}; + +use crate::test::fixtures::{self, future::Expect as _}; + +// There's no test for this in subscribe-then-setup order because creating an +// identity to subscribe with also completes initial setup, preventing the +// test from running. That is also a can't-happen scenario in reality. +#[tokio::test] +async fn previously_completed() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let resume_point = fixtures::boot::resume_point(&app).await; + + // Complete initial setup + + let (name, password) = fixtures::user::propose(); + let (owner, _) = app + .setup() + .initial(&name, &password, &fixtures::now()) + .await + .expect("initial setup in an empty app succeeds"); + + // Subscribe to events + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let super::Response(events) = super::handler( + State(app.clone()), + subscriber, + None, + Query(super::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); + + // Expect a login created event + + let _ = events + .filter_map(fixtures::event::user) + .filter_map(fixtures::event::user::created) + .filter(|event| future::ready(event.user == owner)) + .next() + .expect_some("a login created event is sent") + .await; +} diff --git a/src/event/handlers/stream/test/token.rs b/src/event/handlers/stream/test/token.rs new file mode 100644 index 0000000..2008323 --- /dev/null +++ b/src/event/handlers/stream/test/token.rs @@ -0,0 +1,148 @@ +use axum::extract::State; +use axum_extra::extract::Query; +use futures::{future, stream::StreamExt as _}; + +use crate::test::fixtures::{self, future::Expect as _}; + +#[tokio::test] +async fn terminates_on_token_expiry() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let sender = fixtures::user::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; + + // Subscribe via the endpoint + + let subscriber_creds = fixtures::user::create_with_password(&app, &fixtures::now()).await; + let subscriber = + fixtures::identity::logged_in(&app, &subscriber_creds, &fixtures::ancient()).await; + + let super::Response(events) = super::handler( + State(app.clone()), + subscriber, + None, + Query(super::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); + + // Verify the resulting stream's behaviour + + app.tokens() + .expire(&fixtures::now()) + .await + .expect("expiring tokens succeeds"); + + // These should not be delivered. + let messages = [ + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + ]; + + events + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::sent) + .filter(|event| future::ready(messages.iter().any(|message| &event.message == message))) + .next() + .expect_none("end of stream") + .await; +} + +#[tokio::test] +async fn terminates_on_logout() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let sender = fixtures::user::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; + + // Subscribe via the endpoint + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + + let super::Response(events) = super::handler( + State(app.clone()), + subscriber.clone(), + None, + Query(super::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); + + // Verify the resulting stream's behaviour + + app.tokens() + .logout(&subscriber.token) + .await + .expect("expiring tokens succeeds"); + + // These should not be delivered. + + let messages = [ + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + ]; + + events + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::sent) + .filter(|event| future::ready(messages.iter().any(|message| &event.message == message))) + .next() + .expect_none("end of stream") + .await; +} + +#[tokio::test] +async fn terminates_on_password_change() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let sender = fixtures::user::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; + + // Subscribe via the endpoint + + let creds = fixtures::user::create_with_password(&app, &fixtures::now()).await; + let cookie = fixtures::cookie::logged_in(&app, &creds, &fixtures::now()).await; + let subscriber = fixtures::identity::from_cookie(&app, &cookie, &fixtures::now()).await; + + let super::Response(events) = super::handler( + State(app.clone()), + subscriber.clone(), + None, + Query(super::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); + + // Verify the resulting stream's behaviour + + let (_, password) = creds; + let to = fixtures::user::propose_password(); + app.tokens() + .change_password(&subscriber.user, &password, &to, &fixtures::now()) + .await + .expect("expiring tokens succeeds"); + + // These should not be delivered. + + let messages = [ + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + ]; + + events + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::sent) + .filter(|event| future::ready(messages.iter().any(|message| &event.message == message))) + .next() + .expect_none("end of stream") + .await; +} diff --git a/src/event/mod.rs b/src/event/mod.rs index ff30dc7..6657243 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -7,8 +7,8 @@ use crate::{channel, message, user}; pub mod app; mod broadcaster; mod extract; +pub mod handlers; pub mod repo; -pub mod routes; mod sequence; pub use self::{ diff --git a/src/event/routes/get.rs b/src/event/routes/get.rs deleted file mode 100644 index f6c91fa..0000000 --- a/src/event/routes/get.rs +++ /dev/null @@ -1,82 +0,0 @@ -use axum::{ - extract::State, - response::{ - self, IntoResponse, - sse::{self, Sse}, - }, -}; -use axum_extra::extract::Query; -use futures::stream::{Stream, StreamExt as _}; - -use crate::{ - app::App, - error::{Internal, Unauthorized}, - event::{Event, Heartbeat::Heartbeat, Sequence, Sequenced as _, app, extract::LastEventId}, - token::{app::ValidateError, extract::Identity}, -}; - -pub async fn handler( - State(app): State, - identity: Identity, - last_event_id: Option>, - Query(query): Query, -) -> Result + std::fmt::Debug>, Error> { - let resume_at = last_event_id.map_or(query.resume_point, LastEventId::into_inner); - - let stream = app.events().subscribe(resume_at).await?; - let stream = app.tokens().limit_stream(identity.token, stream).await?; - - Ok(Response(stream)) -} - -#[derive(serde::Deserialize)] -pub struct QueryParams { - pub resume_point: Sequence, -} - -#[derive(Debug)] -pub struct Response(pub S); - -impl IntoResponse for Response -where - S: Stream + Send + 'static, -{ - fn into_response(self) -> response::Response { - let Self(stream) = self; - let stream = stream.map(sse::Event::try_from); - let heartbeat = match Heartbeat.try_into().map_err(Internal::from) { - Ok(heartbeat) => heartbeat, - Err(err) => return err.into_response(), - }; - Sse::new(stream).keep_alive(heartbeat).into_response() - } -} - -impl TryFrom for sse::Event { - type Error = serde_json::Error; - - fn try_from(event: Event) -> Result { - let id = serde_json::to_string(&event.sequence())?; - let data = serde_json::to_string_pretty(&event)?; - - let event = Self::default().id(id).data(data); - - Ok(event) - } -} - -#[derive(Debug, thiserror::Error)] -#[error(transparent)] -pub enum Error { - Subscribe(#[from] app::Error), - Validate(#[from] ValidateError), -} - -impl IntoResponse for Error { - fn into_response(self) -> response::Response { - match self { - Self::Validate(ValidateError::InvalidToken) => Unauthorized.into_response(), - other => Internal::from(other).into_response(), - } - } -} diff --git a/src/event/routes/mod.rs b/src/event/routes/mod.rs deleted file mode 100644 index 60ad5d8..0000000 --- a/src/event/routes/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -pub mod get; -#[cfg(test)] -mod test; diff --git a/src/event/routes/test/channel.rs b/src/event/routes/test/channel.rs deleted file mode 100644 index 0695ab1..0000000 --- a/src/event/routes/test/channel.rs +++ /dev/null @@ -1,276 +0,0 @@ -use axum::extract::State; -use axum_extra::extract::Query; -use futures::{future, stream::StreamExt as _}; - -use crate::{ - event::routes::get, - test::fixtures::{self, future::Expect as _}, -}; - -#[tokio::test] -async fn creating() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let resume_point = fixtures::boot::resume_point(&app).await; - - // Subscribe - - let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = get::handler( - State(app.clone()), - subscriber, - None, - Query(get::QueryParams { resume_point }), - ) - .await - .expect("subscribe never fails"); - - // Create a channel - - let name = fixtures::channel::propose(); - let channel = app - .channels() - .create(&name, &fixtures::now()) - .await - .expect("creating a channel succeeds"); - - // Verify channel created event - - events - .filter_map(fixtures::event::channel) - .filter_map(fixtures::event::channel::created) - .filter(|event| future::ready(event.channel == channel)) - .next() - .expect_some("channel created event is delivered") - .await; -} - -#[tokio::test] -async fn previously_created() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let resume_point = fixtures::boot::resume_point(&app).await; - - // Create a channel - - let name = fixtures::channel::propose(); - let channel = app - .channels() - .create(&name, &fixtures::now()) - .await - .expect("creating a channel succeeds"); - - // Subscribe - - let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = get::handler( - State(app.clone()), - subscriber, - None, - Query(get::QueryParams { resume_point }), - ) - .await - .expect("subscribe never fails"); - - // Verify channel created event - - let _ = events - .filter_map(fixtures::event::channel) - .filter_map(fixtures::event::channel::created) - .filter(|event| future::ready(event.channel == channel)) - .next() - .expect_some("channel created event is delivered") - .await; -} - -#[tokio::test] -async fn expiring() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; - let resume_point = fixtures::boot::resume_point(&app).await; - - // Subscribe - - let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = get::handler( - State(app.clone()), - subscriber, - None, - Query(get::QueryParams { resume_point }), - ) - .await - .expect("subscribe never fails"); - - // Expire channels - - app.channels() - .expire(&fixtures::now()) - .await - .expect("expiring channels always succeeds"); - - // Check for expiry event - let _ = events - .filter_map(fixtures::event::channel) - .filter_map(fixtures::event::channel::deleted) - .filter(|event| future::ready(event.id == channel.id)) - .next() - .expect_some("a deleted channel event will be delivered") - .await; -} - -#[tokio::test] -async fn previously_expired() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; - let resume_point = fixtures::boot::resume_point(&app).await; - - // Expire channels - - app.channels() - .expire(&fixtures::now()) - .await - .expect("expiring channels always succeeds"); - - // Subscribe - - let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = get::handler( - State(app.clone()), - subscriber, - None, - Query(get::QueryParams { resume_point }), - ) - .await - .expect("subscribe never fails"); - - // Check for expiry event - let _ = events - .filter_map(fixtures::event::channel) - .filter_map(fixtures::event::channel::deleted) - .filter(|event| future::ready(event.id == channel.id)) - .next() - .expect_some("a deleted channel event will be delivered") - .await; -} - -#[tokio::test] -async fn deleting() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; - let resume_point = fixtures::boot::resume_point(&app).await; - - // Subscribe - - let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = get::handler( - State(app.clone()), - subscriber, - None, - Query(get::QueryParams { resume_point }), - ) - .await - .expect("subscribe never fails"); - - // Delete the channel - - app.channels() - .delete(&channel.id, &fixtures::now()) - .await - .expect("deleting a valid channel succeeds"); - - // Check for delete event - let _ = events - .filter_map(fixtures::event::channel) - .filter_map(fixtures::event::channel::deleted) - .filter(|event| future::ready(event.id == channel.id)) - .next() - .expect_some("a deleted channel event will be delivered") - .await; -} - -#[tokio::test] -async fn previously_deleted() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; - let resume_point = fixtures::boot::resume_point(&app).await; - - // Delete the channel - - app.channels() - .delete(&channel.id, &fixtures::now()) - .await - .expect("deleting a valid channel succeeds"); - - // Subscribe - - let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = get::handler( - State(app.clone()), - subscriber, - None, - Query(get::QueryParams { resume_point }), - ) - .await - .expect("subscribe never fails"); - - // Check for expiry event - let _ = events - .filter_map(fixtures::event::channel) - .filter_map(fixtures::event::channel::deleted) - .filter(|event| future::ready(event.id == channel.id)) - .next() - .expect_some("a deleted channel event will be delivered") - .await; -} - -#[tokio::test] -async fn previously_purged() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; - let resume_point = fixtures::boot::resume_point(&app).await; - - // Delete and purge the channel - - app.channels() - .delete(&channel.id, &fixtures::ancient()) - .await - .expect("deleting a valid channel succeeds"); - - app.channels() - .purge(&fixtures::now()) - .await - .expect("purging channels always succeeds"); - - // Subscribe - - let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = get::handler( - State(app.clone()), - subscriber, - None, - Query(get::QueryParams { resume_point }), - ) - .await - .expect("subscribe never fails"); - - // Check for expiry event - events - .filter_map(fixtures::event::channel) - .filter_map(fixtures::event::channel::deleted) - .filter(|event| future::ready(event.id == channel.id)) - .next() - .expect_wait("deleted channel events not delivered") - .await; -} diff --git a/src/event/routes/test/invite.rs b/src/event/routes/test/invite.rs deleted file mode 100644 index 1d1bec6..0000000 --- a/src/event/routes/test/invite.rs +++ /dev/null @@ -1,90 +0,0 @@ -use axum::extract::State; -use axum_extra::extract::Query; -use futures::{future, stream::StreamExt as _}; - -use crate::{ - event::routes::get, - test::fixtures::{self, future::Expect as _}, -}; - -#[tokio::test] -async fn accepting_invite() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let issuer = fixtures::user::create(&app, &fixtures::now()).await; - let invite = fixtures::invite::issue(&app, &issuer, &fixtures::now()).await; - let resume_point = fixtures::boot::resume_point(&app).await; - - // Subscribe - - let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = get::handler( - State(app.clone()), - subscriber, - None, - Query(get::QueryParams { resume_point }), - ) - .await - .expect("subscribe never fails"); - - // Accept the invite - - let (name, password) = fixtures::user::propose(); - let (joiner, _) = app - .invites() - .accept(&invite.id, &name, &password, &fixtures::now()) - .await - .expect("accepting an invite succeeds"); - - // Expect a login created event - - let _ = events - .filter_map(fixtures::event::user) - .filter_map(fixtures::event::user::created) - .filter(|event| future::ready(event.user == joiner)) - .next() - .expect_some("a login created event is sent") - .await; -} - -#[tokio::test] -async fn previously_accepted_invite() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let issuer = fixtures::user::create(&app, &fixtures::now()).await; - let invite = fixtures::invite::issue(&app, &issuer, &fixtures::now()).await; - let resume_point = fixtures::boot::resume_point(&app).await; - - // Accept the invite - - let (name, password) = fixtures::user::propose(); - let (joiner, _) = app - .invites() - .accept(&invite.id, &name, &password, &fixtures::now()) - .await - .expect("accepting an invite succeeds"); - - // Subscribe - - let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = get::handler( - State(app.clone()), - subscriber, - None, - Query(get::QueryParams { resume_point }), - ) - .await - .expect("subscribe never fails"); - - // Expect a login created event - - let _ = events - .filter_map(fixtures::event::user) - .filter_map(fixtures::event::user::created) - .filter(|event| future::ready(event.user == joiner)) - .next() - .expect_some("a login created event is sent") - .await; -} diff --git a/src/event/routes/test/message.rs b/src/event/routes/test/message.rs deleted file mode 100644 index 84a3aec..0000000 --- a/src/event/routes/test/message.rs +++ /dev/null @@ -1,396 +0,0 @@ -use axum::extract::State; -use axum_extra::extract::Query; -use futures::{ - future, - stream::{self, StreamExt as _}, -}; - -use crate::{ - event::routes::get, - test::fixtures::{self, future::Expect as _}, -}; - -#[tokio::test] -async fn sending() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; - let resume_point = fixtures::boot::resume_point(&app).await; - - // Call the endpoint - - let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = get::handler( - State(app.clone()), - subscriber, - None, - Query(get::QueryParams { resume_point }), - ) - .await - .expect("subscribe never fails"); - - // Send a message - - let sender = fixtures::user::create(&app, &fixtures::now()).await; - let message = app - .messages() - .send( - &channel.id, - &sender, - &fixtures::now(), - &fixtures::message::propose(), - ) - .await - .expect("sending a message succeeds"); - - // Verify that an event is delivered - - let _ = events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::sent) - .filter(|event| future::ready(event.message == message)) - .next() - .expect_some("delivered message sent event") - .await; -} - -#[tokio::test] -async fn previously_sent() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; - let resume_point = fixtures::boot::resume_point(&app).await; - - // Send a message - - let sender = fixtures::user::create(&app, &fixtures::now()).await; - let message = app - .messages() - .send( - &channel.id, - &sender, - &fixtures::now(), - &fixtures::message::propose(), - ) - .await - .expect("sending a message succeeds"); - - // Call the endpoint - - let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = get::handler( - State(app.clone()), - subscriber, - None, - Query(get::QueryParams { resume_point }), - ) - .await - .expect("subscribe never fails"); - - // Verify that an event is delivered - - let _ = events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::sent) - .filter(|event| future::ready(event.message == message)) - .next() - .expect_some("delivered message sent event") - .await; -} - -#[tokio::test] -async fn sent_in_multiple_channels() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let sender = fixtures::user::create(&app, &fixtures::now()).await; - let resume_point = fixtures::boot::resume_point(&app).await; - - let channels = [ - fixtures::channel::create(&app, &fixtures::now()).await, - fixtures::channel::create(&app, &fixtures::now()).await, - ]; - - let messages = stream::iter(channels) - .then(|channel| { - let app = app.clone(); - let sender = sender.clone(); - let channel = channel.clone(); - async move { fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await } - }) - .collect::>() - .await; - - // Call the endpoint - - let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = get::handler( - State(app), - subscriber, - None, - Query(get::QueryParams { resume_point }), - ) - .await - .expect("subscribe never fails"); - - // Verify the structure of the response. - - let events = events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::sent) - .take(messages.len()) - .collect::>() - .expect_ready("events ready") - .await; - - for message in &messages { - assert!(events.iter().any(|event| &event.message == message)); - } -} - -#[tokio::test] -async fn sent_sequentially() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; - let sender = fixtures::user::create(&app, &fixtures::now()).await; - let resume_point = fixtures::boot::resume_point(&app).await; - - let messages = vec![ - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, - ]; - - // Subscribe - - let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = get::handler( - State(app), - subscriber, - None, - Query(get::QueryParams { resume_point }), - ) - .await - .expect("subscribe never fails"); - - // Verify the expected events in the expected order - - let mut events = events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::sent) - .filter(|event| future::ready(messages.iter().any(|message| &event.message == message))); - - for message in &messages { - let event = events - .next() - .expect_some("undelivered messages remaining") - .await; - - assert_eq!(message, &event.message); - } -} - -#[tokio::test] -async fn expiring() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; - let sender = fixtures::user::create(&app, &fixtures::ancient()).await; - let message = fixtures::message::send(&app, &channel, &sender, &fixtures::ancient()).await; - let resume_point = fixtures::boot::resume_point(&app).await; - - // Subscribe - - let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = get::handler( - State(app.clone()), - subscriber, - None, - Query(get::QueryParams { resume_point }), - ) - .await - .expect("subscribe never fails"); - - // Expire messages - - app.messages() - .expire(&fixtures::now()) - .await - .expect("expiring messages always succeeds"); - - // Check for expiry event - let _ = events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::deleted) - .filter(|event| future::ready(event.id == message.id)) - .next() - .expect_some("a deleted message event will be delivered") - .await; -} - -#[tokio::test] -async fn previously_expired() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; - let sender = fixtures::user::create(&app, &fixtures::ancient()).await; - let message = fixtures::message::send(&app, &channel, &sender, &fixtures::ancient()).await; - let resume_point = fixtures::boot::resume_point(&app).await; - - // Expire messages - - app.messages() - .expire(&fixtures::now()) - .await - .expect("expiring messages always succeeds"); - - // Subscribe - - let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = get::handler( - State(app.clone()), - subscriber, - None, - Query(get::QueryParams { resume_point }), - ) - .await - .expect("subscribe never fails"); - - // Check for expiry event - let _ = events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::deleted) - .filter(|event| future::ready(event.id == message.id)) - .next() - .expect_some("a deleted message event will be delivered") - .await; -} - -#[tokio::test] -async fn deleting() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; - let sender = fixtures::user::create(&app, &fixtures::now()).await; - let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; - let resume_point = fixtures::boot::resume_point(&app).await; - - // Subscribe - - let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = get::handler( - State(app.clone()), - subscriber, - None, - Query(get::QueryParams { resume_point }), - ) - .await - .expect("subscribe never fails"); - - // Delete the message - - app.messages() - .delete(&sender, &message.id, &fixtures::now()) - .await - .expect("deleting a valid message succeeds"); - - // Check for delete event - let _ = events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::deleted) - .filter(|event| future::ready(event.id == message.id)) - .next() - .expect_some("a deleted message event will be delivered") - .await; -} - -#[tokio::test] -async fn previously_deleted() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; - let sender = fixtures::user::create(&app, &fixtures::now()).await; - let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; - let resume_point = fixtures::boot::resume_point(&app).await; - - // Delete the message - - app.messages() - .delete(&sender, &message.id, &fixtures::now()) - .await - .expect("deleting a valid message succeeds"); - - // Subscribe - - let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = get::handler( - State(app.clone()), - subscriber, - None, - Query(get::QueryParams { resume_point }), - ) - .await - .expect("subscribe never fails"); - - // Check for delete event - let _ = events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::deleted) - .filter(|event| future::ready(event.id == message.id)) - .next() - .expect_some("a deleted message event will be delivered") - .await; -} - -#[tokio::test] -async fn previously_purged() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; - let sender = fixtures::user::create(&app, &fixtures::ancient()).await; - let message = fixtures::message::send(&app, &channel, &sender, &fixtures::ancient()).await; - let resume_point = fixtures::boot::resume_point(&app).await; - - // Purge the message - - app.messages() - .delete(&sender, &message.id, &fixtures::ancient()) - .await - .expect("deleting a valid message succeeds"); - - app.messages() - .purge(&fixtures::now()) - .await - .expect("purge always succeeds"); - - // Subscribe - - let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = get::handler( - State(app.clone()), - subscriber, - None, - Query(get::QueryParams { resume_point }), - ) - .await - .expect("subscribe never fails"); - - // Check for delete event - - events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::deleted) - .filter(|event| future::ready(event.id == message.id)) - .next() - .expect_wait("no deleted message will be delivered") - .await; -} diff --git a/src/event/routes/test/mod.rs b/src/event/routes/test/mod.rs deleted file mode 100644 index e7e35f1..0000000 --- a/src/event/routes/test/mod.rs +++ /dev/null @@ -1,6 +0,0 @@ -mod channel; -mod invite; -mod message; -mod resume; -mod setup; -mod token; diff --git a/src/event/routes/test/resume.rs b/src/event/routes/test/resume.rs deleted file mode 100644 index 633eae3..0000000 --- a/src/event/routes/test/resume.rs +++ /dev/null @@ -1,227 +0,0 @@ -use std::future; - -use axum::extract::State; -use axum_extra::extract::Query; -use futures::stream::{self, StreamExt as _}; - -use crate::{ - event::{Sequenced as _, routes::get}, - test::fixtures::{self, future::Expect as _}, -}; - -#[tokio::test] -async fn resume() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; - let sender = fixtures::user::create(&app, &fixtures::now()).await; - let resume_point = fixtures::boot::resume_point(&app).await; - - let initial_message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; - - let later_messages = vec![ - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, - ]; - - // Call the endpoint - - let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - - let resume_at = { - // First subscription - let get::Response(events) = get::handler( - State(app.clone()), - subscriber.clone(), - None, - Query(get::QueryParams { resume_point }), - ) - .await - .expect("subscribe never fails"); - - let event = events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::sent) - .filter(|event| future::ready(event.message == initial_message)) - .next() - .expect_some("delivered event for initial message") - .await; - - event.sequence() - }; - - // Resume after disconnect - let get::Response(resumed) = get::handler( - State(app), - subscriber, - Some(resume_at.into()), - Query(get::QueryParams { resume_point }), - ) - .await - .expect("subscribe never fails"); - - // Verify final events - - let mut events = resumed - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::sent) - .zip(stream::iter(later_messages)); - - while let Some((event, message)) = events.next().expect_ready("event ready").await { - assert_eq!(message, event.message); - } -} - -// This test verifies a real bug I hit developing the vector-of-sequences -// approach to resuming events. A small omission caused the event IDs in a -// resumed stream to _omit_ channels that were in the original stream until -// those channels also appeared in the resumed stream. -// -// Clients would see something like -// * In the original stream, Cfoo=5,Cbar=8 -// * In the resumed stream, Cfoo=6 (no Cbar sequence number) -// -// Disconnecting and reconnecting a second time, using event IDs from that -// initial period of the first resume attempt, would then cause the second -// resume attempt to restart all other channels from the beginning, and not -// from where the first disconnection happened. -// -// As we have switched to a single global event sequence number, this scenario -// can no longer arise, but this test is preserved because the actual behaviour -// _is_ a valid way for clients to behave, and should work. We might as well -// keep testing it. -#[tokio::test] -async fn serial_resume() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let sender = fixtures::user::create(&app, &fixtures::now()).await; - let channel_a = fixtures::channel::create(&app, &fixtures::now()).await; - let channel_b = fixtures::channel::create(&app, &fixtures::now()).await; - let resume_point = fixtures::boot::resume_point(&app).await; - - // Call the endpoint - - let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - - let resume_at = { - let initial_messages = [ - fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await, - fixtures::message::send(&app, &channel_b, &sender, &fixtures::now()).await, - ]; - - // First subscription - - let get::Response(events) = get::handler( - State(app.clone()), - subscriber.clone(), - None, - Query(get::QueryParams { resume_point }), - ) - .await - .expect("subscribe never fails"); - - // Check for expected events - - let events = events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::sent) - .zip(stream::iter(initial_messages)) - .collect::>() - .expect_ready("zipping a finite list of events is ready immediately") - .await; - - assert!( - events - .iter() - .all(|(event, message)| message == &event.message) - ); - - let (event, _) = events.last().expect("this vec is non-empty"); - - // Take the last one's resume point - - event.sequence() - }; - - // Resume after disconnect - let resume_at = { - let resume_messages = [ - // Note that channel_b does not appear here. The buggy behaviour - // would be masked if channel_b happened to send a new message - // into the resumed event stream. - fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await, - fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await, - ]; - - // Second subscription - let get::Response(events) = get::handler( - State(app.clone()), - subscriber.clone(), - Some(resume_at.into()), - Query(get::QueryParams { resume_point }), - ) - .await - .expect("subscribe never fails"); - - // Check for expected events - - let events = events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::sent) - .zip(stream::iter(resume_messages)) - .collect::>() - .expect_ready("zipping a finite list of events is ready immediately") - .await; - - assert!( - events - .iter() - .all(|(event, message)| message == &event.message) - ); - - let (event, _) = events.last().expect("this vec is non-empty"); - - // Take the last one's resume point - - event.sequence() - }; - - // Resume after disconnect a second time - { - // At this point, we can send on either channel and demonstrate the - // problem. The resume point should before both of these messages, but - // after _all_ prior messages. - let final_messages = [ - fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await, - fixtures::message::send(&app, &channel_b, &sender, &fixtures::now()).await, - ]; - - // Third subscription - let get::Response(events) = get::handler( - State(app.clone()), - subscriber.clone(), - Some(resume_at.into()), - Query(get::QueryParams { resume_point }), - ) - .await - .expect("subscribe never fails"); - - // Check for expected events - - let events = events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::sent) - .zip(stream::iter(final_messages)) - .collect::>() - .expect_ready("zipping a finite list of events is ready immediately") - .await; - - assert!( - events - .iter() - .all(|(event, message)| message == &event.message) - ); - }; -} diff --git a/src/event/routes/test/setup.rs b/src/event/routes/test/setup.rs deleted file mode 100644 index 1170fe4..0000000 --- a/src/event/routes/test/setup.rs +++ /dev/null @@ -1,50 +0,0 @@ -use axum::extract::State; -use axum_extra::extract::Query; -use futures::{future, stream::StreamExt as _}; - -use crate::{ - event::routes::get, - test::fixtures::{self, future::Expect as _}, -}; - -// There's no test for this in subscribe-then-setup order because creating an -// identity to subscribe with also completes initial setup, preventing the -// test from running. That is also a can't-happen scenario in reality. -#[tokio::test] -async fn previously_completed() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let resume_point = fixtures::boot::resume_point(&app).await; - - // Complete initial setup - - let (name, password) = fixtures::user::propose(); - let (owner, _) = app - .setup() - .initial(&name, &password, &fixtures::now()) - .await - .expect("initial setup in an empty app succeeds"); - - // Subscribe to events - - let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = get::handler( - State(app.clone()), - subscriber, - None, - Query(get::QueryParams { resume_point }), - ) - .await - .expect("subscribe never fails"); - - // Expect a login created event - - let _ = events - .filter_map(fixtures::event::user) - .filter_map(fixtures::event::user::created) - .filter(|event| future::ready(event.user == owner)) - .next() - .expect_some("a login created event is sent") - .await; -} diff --git a/src/event/routes/test/token.rs b/src/event/routes/test/token.rs deleted file mode 100644 index a467de5..0000000 --- a/src/event/routes/test/token.rs +++ /dev/null @@ -1,151 +0,0 @@ -use axum::extract::State; -use axum_extra::extract::Query; -use futures::{future, stream::StreamExt as _}; - -use crate::{ - event::routes::get, - test::fixtures::{self, future::Expect as _}, -}; - -#[tokio::test] -async fn terminates_on_token_expiry() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; - let sender = fixtures::user::create(&app, &fixtures::now()).await; - let resume_point = fixtures::boot::resume_point(&app).await; - - // Subscribe via the endpoint - - let subscriber_creds = fixtures::user::create_with_password(&app, &fixtures::now()).await; - let subscriber = - fixtures::identity::logged_in(&app, &subscriber_creds, &fixtures::ancient()).await; - - let get::Response(events) = get::handler( - State(app.clone()), - subscriber, - None, - Query(get::QueryParams { resume_point }), - ) - .await - .expect("subscribe never fails"); - - // Verify the resulting stream's behaviour - - app.tokens() - .expire(&fixtures::now()) - .await - .expect("expiring tokens succeeds"); - - // These should not be delivered. - let messages = [ - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, - ]; - - events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::sent) - .filter(|event| future::ready(messages.iter().any(|message| &event.message == message))) - .next() - .expect_none("end of stream") - .await; -} - -#[tokio::test] -async fn terminates_on_logout() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; - let sender = fixtures::user::create(&app, &fixtures::now()).await; - let resume_point = fixtures::boot::resume_point(&app).await; - - // Subscribe via the endpoint - - let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - - let get::Response(events) = get::handler( - State(app.clone()), - subscriber.clone(), - None, - Query(get::QueryParams { resume_point }), - ) - .await - .expect("subscribe never fails"); - - // Verify the resulting stream's behaviour - - app.tokens() - .logout(&subscriber.token) - .await - .expect("expiring tokens succeeds"); - - // These should not be delivered. - - let messages = [ - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, - ]; - - events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::sent) - .filter(|event| future::ready(messages.iter().any(|message| &event.message == message))) - .next() - .expect_none("end of stream") - .await; -} - -#[tokio::test] -async fn terminates_on_password_change() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; - let sender = fixtures::user::create(&app, &fixtures::now()).await; - let resume_point = fixtures::boot::resume_point(&app).await; - - // Subscribe via the endpoint - - let creds = fixtures::user::create_with_password(&app, &fixtures::now()).await; - let cookie = fixtures::cookie::logged_in(&app, &creds, &fixtures::now()).await; - let subscriber = fixtures::identity::from_cookie(&app, &cookie, &fixtures::now()).await; - - let get::Response(events) = get::handler( - State(app.clone()), - subscriber.clone(), - None, - Query(get::QueryParams { resume_point }), - ) - .await - .expect("subscribe never fails"); - - // Verify the resulting stream's behaviour - - let (_, password) = creds; - let to = fixtures::user::propose_password(); - app.tokens() - .change_password(&subscriber.user, &password, &to, &fixtures::now()) - .await - .expect("expiring tokens succeeds"); - - // These should not be delivered. - - let messages = [ - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, - ]; - - events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::sent) - .filter(|event| future::ready(messages.iter().any(|message| &event.message == message))) - .next() - .expect_none("end of stream") - .await; -} -- cgit v1.2.3