summaryrefslogtreecommitdiff
path: root/src/event/routes
diff options
context:
space:
mode:
authorOwen Jacobson <owen@grimoire.ca>2025-06-17 02:11:45 -0400
committerOwen Jacobson <owen@grimoire.ca>2025-06-18 18:31:40 -0400
commit4e3d5ccac99b24934c972e088cd7eb02bb95df06 (patch)
treec94f5a42f7e734b81892c1289a1d2b566706ba7c /src/event/routes
parent5ed96f8e8b9d9f19ee249f5c73a5a21ef6bca09f (diff)
Handlers are _named operations_, which can be exposed via routes.
Each domain module that exposes handlers does so through a `handlers` child module, ideally as a top-level symbol that can be plugged directly into Axum's `MethodRouter`. Modules could make exceptions to this - kill the doctrinaire inside yourself, after all - but none of the API modules that actually exist need such exceptions, and consistency is useful. The related details of request types, URL types, response types, errors, &c &c are then organized into modules under `handlers`, along with their respective tests.
Diffstat (limited to 'src/event/routes')
-rw-r--r--src/event/routes/get.rs82
-rw-r--r--src/event/routes/mod.rs3
-rw-r--r--src/event/routes/test/channel.rs276
-rw-r--r--src/event/routes/test/invite.rs90
-rw-r--r--src/event/routes/test/message.rs396
-rw-r--r--src/event/routes/test/mod.rs6
-rw-r--r--src/event/routes/test/resume.rs227
-rw-r--r--src/event/routes/test/setup.rs50
-rw-r--r--src/event/routes/test/token.rs151
9 files changed, 0 insertions, 1281 deletions
diff --git a/src/event/routes/get.rs b/src/event/routes/get.rs
deleted file mode 100644
index f6c91fa..0000000
--- a/src/event/routes/get.rs
+++ /dev/null
@@ -1,82 +0,0 @@
-use axum::{
- extract::State,
- response::{
- self, IntoResponse,
- sse::{self, Sse},
- },
-};
-use axum_extra::extract::Query;
-use futures::stream::{Stream, StreamExt as _};
-
-use crate::{
- app::App,
- error::{Internal, Unauthorized},
- event::{Event, Heartbeat::Heartbeat, Sequence, Sequenced as _, app, extract::LastEventId},
- token::{app::ValidateError, extract::Identity},
-};
-
-pub async fn handler(
- State(app): State<App>,
- identity: Identity,
- last_event_id: Option<LastEventId<Sequence>>,
- Query(query): Query<QueryParams>,
-) -> Result<Response<impl Stream<Item = Event> + std::fmt::Debug>, Error> {
- let resume_at = last_event_id.map_or(query.resume_point, LastEventId::into_inner);
-
- let stream = app.events().subscribe(resume_at).await?;
- let stream = app.tokens().limit_stream(identity.token, stream).await?;
-
- Ok(Response(stream))
-}
-
-#[derive(serde::Deserialize)]
-pub struct QueryParams {
- pub resume_point: Sequence,
-}
-
-#[derive(Debug)]
-pub struct Response<S>(pub S);
-
-impl<S> IntoResponse for Response<S>
-where
- S: Stream<Item = Event> + Send + 'static,
-{
- fn into_response(self) -> response::Response {
- let Self(stream) = self;
- let stream = stream.map(sse::Event::try_from);
- let heartbeat = match Heartbeat.try_into().map_err(Internal::from) {
- Ok(heartbeat) => heartbeat,
- Err(err) => return err.into_response(),
- };
- Sse::new(stream).keep_alive(heartbeat).into_response()
- }
-}
-
-impl TryFrom<Event> for sse::Event {
- type Error = serde_json::Error;
-
- fn try_from(event: Event) -> Result<Self, Self::Error> {
- let id = serde_json::to_string(&event.sequence())?;
- let data = serde_json::to_string_pretty(&event)?;
-
- let event = Self::default().id(id).data(data);
-
- Ok(event)
- }
-}
-
-#[derive(Debug, thiserror::Error)]
-#[error(transparent)]
-pub enum Error {
- Subscribe(#[from] app::Error),
- Validate(#[from] ValidateError),
-}
-
-impl IntoResponse for Error {
- fn into_response(self) -> response::Response {
- match self {
- Self::Validate(ValidateError::InvalidToken) => Unauthorized.into_response(),
- other => Internal::from(other).into_response(),
- }
- }
-}
diff --git a/src/event/routes/mod.rs b/src/event/routes/mod.rs
deleted file mode 100644
index 60ad5d8..0000000
--- a/src/event/routes/mod.rs
+++ /dev/null
@@ -1,3 +0,0 @@
-pub mod get;
-#[cfg(test)]
-mod test;
diff --git a/src/event/routes/test/channel.rs b/src/event/routes/test/channel.rs
deleted file mode 100644
index 0695ab1..0000000
--- a/src/event/routes/test/channel.rs
+++ /dev/null
@@ -1,276 +0,0 @@
-use axum::extract::State;
-use axum_extra::extract::Query;
-use futures::{future, stream::StreamExt as _};
-
-use crate::{
- event::routes::get,
- test::fixtures::{self, future::Expect as _},
-};
-
-#[tokio::test]
-async fn creating() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let resume_point = fixtures::boot::resume_point(&app).await;
-
- // Subscribe
-
- let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) = get::handler(
- State(app.clone()),
- subscriber,
- None,
- Query(get::QueryParams { resume_point }),
- )
- .await
- .expect("subscribe never fails");
-
- // Create a channel
-
- let name = fixtures::channel::propose();
- let channel = app
- .channels()
- .create(&name, &fixtures::now())
- .await
- .expect("creating a channel succeeds");
-
- // Verify channel created event
-
- events
- .filter_map(fixtures::event::channel)
- .filter_map(fixtures::event::channel::created)
- .filter(|event| future::ready(event.channel == channel))
- .next()
- .expect_some("channel created event is delivered")
- .await;
-}
-
-#[tokio::test]
-async fn previously_created() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let resume_point = fixtures::boot::resume_point(&app).await;
-
- // Create a channel
-
- let name = fixtures::channel::propose();
- let channel = app
- .channels()
- .create(&name, &fixtures::now())
- .await
- .expect("creating a channel succeeds");
-
- // Subscribe
-
- let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) = get::handler(
- State(app.clone()),
- subscriber,
- None,
- Query(get::QueryParams { resume_point }),
- )
- .await
- .expect("subscribe never fails");
-
- // Verify channel created event
-
- let _ = events
- .filter_map(fixtures::event::channel)
- .filter_map(fixtures::event::channel::created)
- .filter(|event| future::ready(event.channel == channel))
- .next()
- .expect_some("channel created event is delivered")
- .await;
-}
-
-#[tokio::test]
-async fn expiring() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let channel = fixtures::channel::create(&app, &fixtures::ancient()).await;
- let resume_point = fixtures::boot::resume_point(&app).await;
-
- // Subscribe
-
- let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) = get::handler(
- State(app.clone()),
- subscriber,
- None,
- Query(get::QueryParams { resume_point }),
- )
- .await
- .expect("subscribe never fails");
-
- // Expire channels
-
- app.channels()
- .expire(&fixtures::now())
- .await
- .expect("expiring channels always succeeds");
-
- // Check for expiry event
- let _ = events
- .filter_map(fixtures::event::channel)
- .filter_map(fixtures::event::channel::deleted)
- .filter(|event| future::ready(event.id == channel.id))
- .next()
- .expect_some("a deleted channel event will be delivered")
- .await;
-}
-
-#[tokio::test]
-async fn previously_expired() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let channel = fixtures::channel::create(&app, &fixtures::ancient()).await;
- let resume_point = fixtures::boot::resume_point(&app).await;
-
- // Expire channels
-
- app.channels()
- .expire(&fixtures::now())
- .await
- .expect("expiring channels always succeeds");
-
- // Subscribe
-
- let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) = get::handler(
- State(app.clone()),
- subscriber,
- None,
- Query(get::QueryParams { resume_point }),
- )
- .await
- .expect("subscribe never fails");
-
- // Check for expiry event
- let _ = events
- .filter_map(fixtures::event::channel)
- .filter_map(fixtures::event::channel::deleted)
- .filter(|event| future::ready(event.id == channel.id))
- .next()
- .expect_some("a deleted channel event will be delivered")
- .await;
-}
-
-#[tokio::test]
-async fn deleting() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let channel = fixtures::channel::create(&app, &fixtures::now()).await;
- let resume_point = fixtures::boot::resume_point(&app).await;
-
- // Subscribe
-
- let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) = get::handler(
- State(app.clone()),
- subscriber,
- None,
- Query(get::QueryParams { resume_point }),
- )
- .await
- .expect("subscribe never fails");
-
- // Delete the channel
-
- app.channels()
- .delete(&channel.id, &fixtures::now())
- .await
- .expect("deleting a valid channel succeeds");
-
- // Check for delete event
- let _ = events
- .filter_map(fixtures::event::channel)
- .filter_map(fixtures::event::channel::deleted)
- .filter(|event| future::ready(event.id == channel.id))
- .next()
- .expect_some("a deleted channel event will be delivered")
- .await;
-}
-
-#[tokio::test]
-async fn previously_deleted() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let channel = fixtures::channel::create(&app, &fixtures::now()).await;
- let resume_point = fixtures::boot::resume_point(&app).await;
-
- // Delete the channel
-
- app.channels()
- .delete(&channel.id, &fixtures::now())
- .await
- .expect("deleting a valid channel succeeds");
-
- // Subscribe
-
- let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) = get::handler(
- State(app.clone()),
- subscriber,
- None,
- Query(get::QueryParams { resume_point }),
- )
- .await
- .expect("subscribe never fails");
-
- // Check for expiry event
- let _ = events
- .filter_map(fixtures::event::channel)
- .filter_map(fixtures::event::channel::deleted)
- .filter(|event| future::ready(event.id == channel.id))
- .next()
- .expect_some("a deleted channel event will be delivered")
- .await;
-}
-
-#[tokio::test]
-async fn previously_purged() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let channel = fixtures::channel::create(&app, &fixtures::ancient()).await;
- let resume_point = fixtures::boot::resume_point(&app).await;
-
- // Delete and purge the channel
-
- app.channels()
- .delete(&channel.id, &fixtures::ancient())
- .await
- .expect("deleting a valid channel succeeds");
-
- app.channels()
- .purge(&fixtures::now())
- .await
- .expect("purging channels always succeeds");
-
- // Subscribe
-
- let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) = get::handler(
- State(app.clone()),
- subscriber,
- None,
- Query(get::QueryParams { resume_point }),
- )
- .await
- .expect("subscribe never fails");
-
- // Check for expiry event
- events
- .filter_map(fixtures::event::channel)
- .filter_map(fixtures::event::channel::deleted)
- .filter(|event| future::ready(event.id == channel.id))
- .next()
- .expect_wait("deleted channel events not delivered")
- .await;
-}
diff --git a/src/event/routes/test/invite.rs b/src/event/routes/test/invite.rs
deleted file mode 100644
index 1d1bec6..0000000
--- a/src/event/routes/test/invite.rs
+++ /dev/null
@@ -1,90 +0,0 @@
-use axum::extract::State;
-use axum_extra::extract::Query;
-use futures::{future, stream::StreamExt as _};
-
-use crate::{
- event::routes::get,
- test::fixtures::{self, future::Expect as _},
-};
-
-#[tokio::test]
-async fn accepting_invite() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let issuer = fixtures::user::create(&app, &fixtures::now()).await;
- let invite = fixtures::invite::issue(&app, &issuer, &fixtures::now()).await;
- let resume_point = fixtures::boot::resume_point(&app).await;
-
- // Subscribe
-
- let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) = get::handler(
- State(app.clone()),
- subscriber,
- None,
- Query(get::QueryParams { resume_point }),
- )
- .await
- .expect("subscribe never fails");
-
- // Accept the invite
-
- let (name, password) = fixtures::user::propose();
- let (joiner, _) = app
- .invites()
- .accept(&invite.id, &name, &password, &fixtures::now())
- .await
- .expect("accepting an invite succeeds");
-
- // Expect a login created event
-
- let _ = events
- .filter_map(fixtures::event::user)
- .filter_map(fixtures::event::user::created)
- .filter(|event| future::ready(event.user == joiner))
- .next()
- .expect_some("a login created event is sent")
- .await;
-}
-
-#[tokio::test]
-async fn previously_accepted_invite() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let issuer = fixtures::user::create(&app, &fixtures::now()).await;
- let invite = fixtures::invite::issue(&app, &issuer, &fixtures::now()).await;
- let resume_point = fixtures::boot::resume_point(&app).await;
-
- // Accept the invite
-
- let (name, password) = fixtures::user::propose();
- let (joiner, _) = app
- .invites()
- .accept(&invite.id, &name, &password, &fixtures::now())
- .await
- .expect("accepting an invite succeeds");
-
- // Subscribe
-
- let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) = get::handler(
- State(app.clone()),
- subscriber,
- None,
- Query(get::QueryParams { resume_point }),
- )
- .await
- .expect("subscribe never fails");
-
- // Expect a login created event
-
- let _ = events
- .filter_map(fixtures::event::user)
- .filter_map(fixtures::event::user::created)
- .filter(|event| future::ready(event.user == joiner))
- .next()
- .expect_some("a login created event is sent")
- .await;
-}
diff --git a/src/event/routes/test/message.rs b/src/event/routes/test/message.rs
deleted file mode 100644
index 84a3aec..0000000
--- a/src/event/routes/test/message.rs
+++ /dev/null
@@ -1,396 +0,0 @@
-use axum::extract::State;
-use axum_extra::extract::Query;
-use futures::{
- future,
- stream::{self, StreamExt as _},
-};
-
-use crate::{
- event::routes::get,
- test::fixtures::{self, future::Expect as _},
-};
-
-#[tokio::test]
-async fn sending() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let channel = fixtures::channel::create(&app, &fixtures::now()).await;
- let resume_point = fixtures::boot::resume_point(&app).await;
-
- // Call the endpoint
-
- let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) = get::handler(
- State(app.clone()),
- subscriber,
- None,
- Query(get::QueryParams { resume_point }),
- )
- .await
- .expect("subscribe never fails");
-
- // Send a message
-
- let sender = fixtures::user::create(&app, &fixtures::now()).await;
- let message = app
- .messages()
- .send(
- &channel.id,
- &sender,
- &fixtures::now(),
- &fixtures::message::propose(),
- )
- .await
- .expect("sending a message succeeds");
-
- // Verify that an event is delivered
-
- let _ = events
- .filter_map(fixtures::event::message)
- .filter_map(fixtures::event::message::sent)
- .filter(|event| future::ready(event.message == message))
- .next()
- .expect_some("delivered message sent event")
- .await;
-}
-
-#[tokio::test]
-async fn previously_sent() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let channel = fixtures::channel::create(&app, &fixtures::now()).await;
- let resume_point = fixtures::boot::resume_point(&app).await;
-
- // Send a message
-
- let sender = fixtures::user::create(&app, &fixtures::now()).await;
- let message = app
- .messages()
- .send(
- &channel.id,
- &sender,
- &fixtures::now(),
- &fixtures::message::propose(),
- )
- .await
- .expect("sending a message succeeds");
-
- // Call the endpoint
-
- let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) = get::handler(
- State(app.clone()),
- subscriber,
- None,
- Query(get::QueryParams { resume_point }),
- )
- .await
- .expect("subscribe never fails");
-
- // Verify that an event is delivered
-
- let _ = events
- .filter_map(fixtures::event::message)
- .filter_map(fixtures::event::message::sent)
- .filter(|event| future::ready(event.message == message))
- .next()
- .expect_some("delivered message sent event")
- .await;
-}
-
-#[tokio::test]
-async fn sent_in_multiple_channels() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let sender = fixtures::user::create(&app, &fixtures::now()).await;
- let resume_point = fixtures::boot::resume_point(&app).await;
-
- let channels = [
- fixtures::channel::create(&app, &fixtures::now()).await,
- fixtures::channel::create(&app, &fixtures::now()).await,
- ];
-
- let messages = stream::iter(channels)
- .then(|channel| {
- let app = app.clone();
- let sender = sender.clone();
- let channel = channel.clone();
- async move { fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await }
- })
- .collect::<Vec<_>>()
- .await;
-
- // Call the endpoint
-
- let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) = get::handler(
- State(app),
- subscriber,
- None,
- Query(get::QueryParams { resume_point }),
- )
- .await
- .expect("subscribe never fails");
-
- // Verify the structure of the response.
-
- let events = events
- .filter_map(fixtures::event::message)
- .filter_map(fixtures::event::message::sent)
- .take(messages.len())
- .collect::<Vec<_>>()
- .expect_ready("events ready")
- .await;
-
- for message in &messages {
- assert!(events.iter().any(|event| &event.message == message));
- }
-}
-
-#[tokio::test]
-async fn sent_sequentially() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let channel = fixtures::channel::create(&app, &fixtures::now()).await;
- let sender = fixtures::user::create(&app, &fixtures::now()).await;
- let resume_point = fixtures::boot::resume_point(&app).await;
-
- let messages = vec![
- fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
- fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
- fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
- ];
-
- // Subscribe
-
- let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) = get::handler(
- State(app),
- subscriber,
- None,
- Query(get::QueryParams { resume_point }),
- )
- .await
- .expect("subscribe never fails");
-
- // Verify the expected events in the expected order
-
- let mut events = events
- .filter_map(fixtures::event::message)
- .filter_map(fixtures::event::message::sent)
- .filter(|event| future::ready(messages.iter().any(|message| &event.message == message)));
-
- for message in &messages {
- let event = events
- .next()
- .expect_some("undelivered messages remaining")
- .await;
-
- assert_eq!(message, &event.message);
- }
-}
-
-#[tokio::test]
-async fn expiring() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let channel = fixtures::channel::create(&app, &fixtures::ancient()).await;
- let sender = fixtures::user::create(&app, &fixtures::ancient()).await;
- let message = fixtures::message::send(&app, &channel, &sender, &fixtures::ancient()).await;
- let resume_point = fixtures::boot::resume_point(&app).await;
-
- // Subscribe
-
- let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) = get::handler(
- State(app.clone()),
- subscriber,
- None,
- Query(get::QueryParams { resume_point }),
- )
- .await
- .expect("subscribe never fails");
-
- // Expire messages
-
- app.messages()
- .expire(&fixtures::now())
- .await
- .expect("expiring messages always succeeds");
-
- // Check for expiry event
- let _ = events
- .filter_map(fixtures::event::message)
- .filter_map(fixtures::event::message::deleted)
- .filter(|event| future::ready(event.id == message.id))
- .next()
- .expect_some("a deleted message event will be delivered")
- .await;
-}
-
-#[tokio::test]
-async fn previously_expired() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let channel = fixtures::channel::create(&app, &fixtures::ancient()).await;
- let sender = fixtures::user::create(&app, &fixtures::ancient()).await;
- let message = fixtures::message::send(&app, &channel, &sender, &fixtures::ancient()).await;
- let resume_point = fixtures::boot::resume_point(&app).await;
-
- // Expire messages
-
- app.messages()
- .expire(&fixtures::now())
- .await
- .expect("expiring messages always succeeds");
-
- // Subscribe
-
- let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) = get::handler(
- State(app.clone()),
- subscriber,
- None,
- Query(get::QueryParams { resume_point }),
- )
- .await
- .expect("subscribe never fails");
-
- // Check for expiry event
- let _ = events
- .filter_map(fixtures::event::message)
- .filter_map(fixtures::event::message::deleted)
- .filter(|event| future::ready(event.id == message.id))
- .next()
- .expect_some("a deleted message event will be delivered")
- .await;
-}
-
-#[tokio::test]
-async fn deleting() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let channel = fixtures::channel::create(&app, &fixtures::now()).await;
- let sender = fixtures::user::create(&app, &fixtures::now()).await;
- let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await;
- let resume_point = fixtures::boot::resume_point(&app).await;
-
- // Subscribe
-
- let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) = get::handler(
- State(app.clone()),
- subscriber,
- None,
- Query(get::QueryParams { resume_point }),
- )
- .await
- .expect("subscribe never fails");
-
- // Delete the message
-
- app.messages()
- .delete(&sender, &message.id, &fixtures::now())
- .await
- .expect("deleting a valid message succeeds");
-
- // Check for delete event
- let _ = events
- .filter_map(fixtures::event::message)
- .filter_map(fixtures::event::message::deleted)
- .filter(|event| future::ready(event.id == message.id))
- .next()
- .expect_some("a deleted message event will be delivered")
- .await;
-}
-
-#[tokio::test]
-async fn previously_deleted() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let channel = fixtures::channel::create(&app, &fixtures::now()).await;
- let sender = fixtures::user::create(&app, &fixtures::now()).await;
- let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await;
- let resume_point = fixtures::boot::resume_point(&app).await;
-
- // Delete the message
-
- app.messages()
- .delete(&sender, &message.id, &fixtures::now())
- .await
- .expect("deleting a valid message succeeds");
-
- // Subscribe
-
- let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) = get::handler(
- State(app.clone()),
- subscriber,
- None,
- Query(get::QueryParams { resume_point }),
- )
- .await
- .expect("subscribe never fails");
-
- // Check for delete event
- let _ = events
- .filter_map(fixtures::event::message)
- .filter_map(fixtures::event::message::deleted)
- .filter(|event| future::ready(event.id == message.id))
- .next()
- .expect_some("a deleted message event will be delivered")
- .await;
-}
-
-#[tokio::test]
-async fn previously_purged() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let channel = fixtures::channel::create(&app, &fixtures::ancient()).await;
- let sender = fixtures::user::create(&app, &fixtures::ancient()).await;
- let message = fixtures::message::send(&app, &channel, &sender, &fixtures::ancient()).await;
- let resume_point = fixtures::boot::resume_point(&app).await;
-
- // Purge the message
-
- app.messages()
- .delete(&sender, &message.id, &fixtures::ancient())
- .await
- .expect("deleting a valid message succeeds");
-
- app.messages()
- .purge(&fixtures::now())
- .await
- .expect("purge always succeeds");
-
- // Subscribe
-
- let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) = get::handler(
- State(app.clone()),
- subscriber,
- None,
- Query(get::QueryParams { resume_point }),
- )
- .await
- .expect("subscribe never fails");
-
- // Check for delete event
-
- events
- .filter_map(fixtures::event::message)
- .filter_map(fixtures::event::message::deleted)
- .filter(|event| future::ready(event.id == message.id))
- .next()
- .expect_wait("no deleted message will be delivered")
- .await;
-}
diff --git a/src/event/routes/test/mod.rs b/src/event/routes/test/mod.rs
deleted file mode 100644
index e7e35f1..0000000
--- a/src/event/routes/test/mod.rs
+++ /dev/null
@@ -1,6 +0,0 @@
-mod channel;
-mod invite;
-mod message;
-mod resume;
-mod setup;
-mod token;
diff --git a/src/event/routes/test/resume.rs b/src/event/routes/test/resume.rs
deleted file mode 100644
index 633eae3..0000000
--- a/src/event/routes/test/resume.rs
+++ /dev/null
@@ -1,227 +0,0 @@
-use std::future;
-
-use axum::extract::State;
-use axum_extra::extract::Query;
-use futures::stream::{self, StreamExt as _};
-
-use crate::{
- event::{Sequenced as _, routes::get},
- test::fixtures::{self, future::Expect as _},
-};
-
-#[tokio::test]
-async fn resume() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let channel = fixtures::channel::create(&app, &fixtures::now()).await;
- let sender = fixtures::user::create(&app, &fixtures::now()).await;
- let resume_point = fixtures::boot::resume_point(&app).await;
-
- let initial_message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await;
-
- let later_messages = vec![
- fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
- fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
- ];
-
- // Call the endpoint
-
- let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
-
- let resume_at = {
- // First subscription
- let get::Response(events) = get::handler(
- State(app.clone()),
- subscriber.clone(),
- None,
- Query(get::QueryParams { resume_point }),
- )
- .await
- .expect("subscribe never fails");
-
- let event = events
- .filter_map(fixtures::event::message)
- .filter_map(fixtures::event::message::sent)
- .filter(|event| future::ready(event.message == initial_message))
- .next()
- .expect_some("delivered event for initial message")
- .await;
-
- event.sequence()
- };
-
- // Resume after disconnect
- let get::Response(resumed) = get::handler(
- State(app),
- subscriber,
- Some(resume_at.into()),
- Query(get::QueryParams { resume_point }),
- )
- .await
- .expect("subscribe never fails");
-
- // Verify final events
-
- let mut events = resumed
- .filter_map(fixtures::event::message)
- .filter_map(fixtures::event::message::sent)
- .zip(stream::iter(later_messages));
-
- while let Some((event, message)) = events.next().expect_ready("event ready").await {
- assert_eq!(message, event.message);
- }
-}
-
-// This test verifies a real bug I hit developing the vector-of-sequences
-// approach to resuming events. A small omission caused the event IDs in a
-// resumed stream to _omit_ channels that were in the original stream until
-// those channels also appeared in the resumed stream.
-//
-// Clients would see something like
-// * In the original stream, Cfoo=5,Cbar=8
-// * In the resumed stream, Cfoo=6 (no Cbar sequence number)
-//
-// Disconnecting and reconnecting a second time, using event IDs from that
-// initial period of the first resume attempt, would then cause the second
-// resume attempt to restart all other channels from the beginning, and not
-// from where the first disconnection happened.
-//
-// As we have switched to a single global event sequence number, this scenario
-// can no longer arise, but this test is preserved because the actual behaviour
-// _is_ a valid way for clients to behave, and should work. We might as well
-// keep testing it.
-#[tokio::test]
-async fn serial_resume() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let sender = fixtures::user::create(&app, &fixtures::now()).await;
- let channel_a = fixtures::channel::create(&app, &fixtures::now()).await;
- let channel_b = fixtures::channel::create(&app, &fixtures::now()).await;
- let resume_point = fixtures::boot::resume_point(&app).await;
-
- // Call the endpoint
-
- let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
-
- let resume_at = {
- let initial_messages = [
- fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await,
- fixtures::message::send(&app, &channel_b, &sender, &fixtures::now()).await,
- ];
-
- // First subscription
-
- let get::Response(events) = get::handler(
- State(app.clone()),
- subscriber.clone(),
- None,
- Query(get::QueryParams { resume_point }),
- )
- .await
- .expect("subscribe never fails");
-
- // Check for expected events
-
- let events = events
- .filter_map(fixtures::event::message)
- .filter_map(fixtures::event::message::sent)
- .zip(stream::iter(initial_messages))
- .collect::<Vec<_>>()
- .expect_ready("zipping a finite list of events is ready immediately")
- .await;
-
- assert!(
- events
- .iter()
- .all(|(event, message)| message == &event.message)
- );
-
- let (event, _) = events.last().expect("this vec is non-empty");
-
- // Take the last one's resume point
-
- event.sequence()
- };
-
- // Resume after disconnect
- let resume_at = {
- let resume_messages = [
- // Note that channel_b does not appear here. The buggy behaviour
- // would be masked if channel_b happened to send a new message
- // into the resumed event stream.
- fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await,
- fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await,
- ];
-
- // Second subscription
- let get::Response(events) = get::handler(
- State(app.clone()),
- subscriber.clone(),
- Some(resume_at.into()),
- Query(get::QueryParams { resume_point }),
- )
- .await
- .expect("subscribe never fails");
-
- // Check for expected events
-
- let events = events
- .filter_map(fixtures::event::message)
- .filter_map(fixtures::event::message::sent)
- .zip(stream::iter(resume_messages))
- .collect::<Vec<_>>()
- .expect_ready("zipping a finite list of events is ready immediately")
- .await;
-
- assert!(
- events
- .iter()
- .all(|(event, message)| message == &event.message)
- );
-
- let (event, _) = events.last().expect("this vec is non-empty");
-
- // Take the last one's resume point
-
- event.sequence()
- };
-
- // Resume after disconnect a second time
- {
- // At this point, we can send on either channel and demonstrate the
- // problem. The resume point should before both of these messages, but
- // after _all_ prior messages.
- let final_messages = [
- fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await,
- fixtures::message::send(&app, &channel_b, &sender, &fixtures::now()).await,
- ];
-
- // Third subscription
- let get::Response(events) = get::handler(
- State(app.clone()),
- subscriber.clone(),
- Some(resume_at.into()),
- Query(get::QueryParams { resume_point }),
- )
- .await
- .expect("subscribe never fails");
-
- // Check for expected events
-
- let events = events
- .filter_map(fixtures::event::message)
- .filter_map(fixtures::event::message::sent)
- .zip(stream::iter(final_messages))
- .collect::<Vec<_>>()
- .expect_ready("zipping a finite list of events is ready immediately")
- .await;
-
- assert!(
- events
- .iter()
- .all(|(event, message)| message == &event.message)
- );
- };
-}
diff --git a/src/event/routes/test/setup.rs b/src/event/routes/test/setup.rs
deleted file mode 100644
index 1170fe4..0000000
--- a/src/event/routes/test/setup.rs
+++ /dev/null
@@ -1,50 +0,0 @@
-use axum::extract::State;
-use axum_extra::extract::Query;
-use futures::{future, stream::StreamExt as _};
-
-use crate::{
- event::routes::get,
- test::fixtures::{self, future::Expect as _},
-};
-
-// There's no test for this in subscribe-then-setup order because creating an
-// identity to subscribe with also completes initial setup, preventing the
-// test from running. That is also a can't-happen scenario in reality.
-#[tokio::test]
-async fn previously_completed() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let resume_point = fixtures::boot::resume_point(&app).await;
-
- // Complete initial setup
-
- let (name, password) = fixtures::user::propose();
- let (owner, _) = app
- .setup()
- .initial(&name, &password, &fixtures::now())
- .await
- .expect("initial setup in an empty app succeeds");
-
- // Subscribe to events
-
- let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) = get::handler(
- State(app.clone()),
- subscriber,
- None,
- Query(get::QueryParams { resume_point }),
- )
- .await
- .expect("subscribe never fails");
-
- // Expect a login created event
-
- let _ = events
- .filter_map(fixtures::event::user)
- .filter_map(fixtures::event::user::created)
- .filter(|event| future::ready(event.user == owner))
- .next()
- .expect_some("a login created event is sent")
- .await;
-}
diff --git a/src/event/routes/test/token.rs b/src/event/routes/test/token.rs
deleted file mode 100644
index a467de5..0000000
--- a/src/event/routes/test/token.rs
+++ /dev/null
@@ -1,151 +0,0 @@
-use axum::extract::State;
-use axum_extra::extract::Query;
-use futures::{future, stream::StreamExt as _};
-
-use crate::{
- event::routes::get,
- test::fixtures::{self, future::Expect as _},
-};
-
-#[tokio::test]
-async fn terminates_on_token_expiry() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let channel = fixtures::channel::create(&app, &fixtures::now()).await;
- let sender = fixtures::user::create(&app, &fixtures::now()).await;
- let resume_point = fixtures::boot::resume_point(&app).await;
-
- // Subscribe via the endpoint
-
- let subscriber_creds = fixtures::user::create_with_password(&app, &fixtures::now()).await;
- let subscriber =
- fixtures::identity::logged_in(&app, &subscriber_creds, &fixtures::ancient()).await;
-
- let get::Response(events) = get::handler(
- State(app.clone()),
- subscriber,
- None,
- Query(get::QueryParams { resume_point }),
- )
- .await
- .expect("subscribe never fails");
-
- // Verify the resulting stream's behaviour
-
- app.tokens()
- .expire(&fixtures::now())
- .await
- .expect("expiring tokens succeeds");
-
- // These should not be delivered.
- let messages = [
- fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
- fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
- fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
- ];
-
- events
- .filter_map(fixtures::event::message)
- .filter_map(fixtures::event::message::sent)
- .filter(|event| future::ready(messages.iter().any(|message| &event.message == message)))
- .next()
- .expect_none("end of stream")
- .await;
-}
-
-#[tokio::test]
-async fn terminates_on_logout() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let channel = fixtures::channel::create(&app, &fixtures::now()).await;
- let sender = fixtures::user::create(&app, &fixtures::now()).await;
- let resume_point = fixtures::boot::resume_point(&app).await;
-
- // Subscribe via the endpoint
-
- let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
-
- let get::Response(events) = get::handler(
- State(app.clone()),
- subscriber.clone(),
- None,
- Query(get::QueryParams { resume_point }),
- )
- .await
- .expect("subscribe never fails");
-
- // Verify the resulting stream's behaviour
-
- app.tokens()
- .logout(&subscriber.token)
- .await
- .expect("expiring tokens succeeds");
-
- // These should not be delivered.
-
- let messages = [
- fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
- fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
- fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
- ];
-
- events
- .filter_map(fixtures::event::message)
- .filter_map(fixtures::event::message::sent)
- .filter(|event| future::ready(messages.iter().any(|message| &event.message == message)))
- .next()
- .expect_none("end of stream")
- .await;
-}
-
-#[tokio::test]
-async fn terminates_on_password_change() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let channel = fixtures::channel::create(&app, &fixtures::now()).await;
- let sender = fixtures::user::create(&app, &fixtures::now()).await;
- let resume_point = fixtures::boot::resume_point(&app).await;
-
- // Subscribe via the endpoint
-
- let creds = fixtures::user::create_with_password(&app, &fixtures::now()).await;
- let cookie = fixtures::cookie::logged_in(&app, &creds, &fixtures::now()).await;
- let subscriber = fixtures::identity::from_cookie(&app, &cookie, &fixtures::now()).await;
-
- let get::Response(events) = get::handler(
- State(app.clone()),
- subscriber.clone(),
- None,
- Query(get::QueryParams { resume_point }),
- )
- .await
- .expect("subscribe never fails");
-
- // Verify the resulting stream's behaviour
-
- let (_, password) = creds;
- let to = fixtures::user::propose_password();
- app.tokens()
- .change_password(&subscriber.user, &password, &to, &fixtures::now())
- .await
- .expect("expiring tokens succeeds");
-
- // These should not be delivered.
-
- let messages = [
- fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
- fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
- fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
- ];
-
- events
- .filter_map(fixtures::event::message)
- .filter_map(fixtures::event::message::sent)
- .filter(|event| future::ready(messages.iter().any(|message| &event.message == message)))
- .next()
- .expect_none("end of stream")
- .await;
-}