summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorKit La Touche <kit@transneptune.net>2024-10-25 22:16:03 -0400
committerKit La Touche <kit@transneptune.net>2024-10-25 22:16:03 -0400
commita50911a03c8955e08c77b0f3764dbda963013971 (patch)
tree9f5319191438b85b860ba06c9a203d3f129072a1 /src
parent4c49283553f4b18bb2a74de280b340a073e3253e (diff)
parentc87b5c53077c02bf21234e24bf976aa7a5f2bac8 (diff)
Merge branch 'main' into wip/mobile
Diffstat (limited to 'src')
-rw-r--r--src/app.rs2
-rw-r--r--src/channel/app.rs10
-rw-r--r--src/channel/routes/channel/delete.rs23
-rw-r--r--src/channel/routes/channel/test/delete.rs17
-rw-r--r--src/channel/routes/channel/test/post.rs29
-rw-r--r--src/channel/routes/test.rs15
-rw-r--r--src/clock.rs2
-rw-r--r--src/event/routes/test.rs459
-rw-r--r--src/event/routes/test/channel.rs241
-rw-r--r--src/event/routes/test/invite.rs80
-rw-r--r--src/event/routes/test/message.rs349
-rw-r--r--src/event/routes/test/mod.rs6
-rw-r--r--src/event/routes/test/resume.rs219
-rw-r--r--src/event/routes/test/setup.rs45
-rw-r--r--src/event/routes/test/token.rs95
-rw-r--r--src/invite/app.rs16
-rw-r--r--src/invite/mod.rs2
-rw-r--r--src/invite/routes/invite/mod.rs2
-rw-r--r--src/invite/routes/invite/post.rs3
-rw-r--r--src/invite/routes/invite/test/get.rs65
-rw-r--r--src/invite/routes/invite/test/mod.rs2
-rw-r--r--src/invite/routes/invite/test/post.rs208
-rw-r--r--src/invite/routes/mod.rs2
-rw-r--r--src/invite/routes/post.rs2
-rw-r--r--src/invite/routes/test.rs28
-rw-r--r--src/login/password.rs2
-rw-r--r--src/message/routes/message/mod.rs21
-rw-r--r--src/message/routes/message/test.rs7
-rw-r--r--src/setup/routes/mod.rs2
-rw-r--r--src/setup/routes/test.rs69
-rw-r--r--src/test/fixtures/channel.rs17
-rw-r--r--src/test/fixtures/event.rs83
-rw-r--r--src/test/fixtures/future.rs240
-rw-r--r--src/test/fixtures/invite.rs17
-rw-r--r--src/test/fixtures/message.rs10
-rw-r--r--src/test/fixtures/mod.rs1
-rw-r--r--src/ui/mime.rs5
-rw-r--r--src/ui/routes/ch/channel.rs19
38 files changed, 1805 insertions, 610 deletions
diff --git a/src/app.rs b/src/app.rs
index 6d71259..bc1daa5 100644
--- a/src/app.rs
+++ b/src/app.rs
@@ -44,7 +44,7 @@ impl App {
}
pub const fn invites(&self) -> Invites {
- Invites::new(&self.db)
+ Invites::new(&self.db, &self.events)
}
#[cfg(not(test))]
diff --git a/src/channel/app.rs b/src/channel/app.rs
index 7bfa0f7..8359277 100644
--- a/src/channel/app.rs
+++ b/src/channel/app.rs
@@ -4,7 +4,7 @@ use sqlx::sqlite::SqlitePool;
use super::{
repo::{LoadError, Provider as _},
- Channel, History, Id,
+ Channel, Id,
};
use crate::{
clock::DateTime,
@@ -42,12 +42,14 @@ impl<'a> Channels<'a> {
// This function is careless with respect to time, and gets you the channel as
// it exists in the specific moment when you call it.
- pub async fn get(&self, channel: &Id) -> Result<Option<Channel>, Error> {
+ pub async fn get(&self, channel: &Id) -> Result<Channel, Error> {
+ let not_found = || Error::NotFound(channel.clone());
+
let mut tx = self.db.begin().await?;
- let channel = tx.channels().by_id(channel).await.optional()?;
+ let channel = tx.channels().by_id(channel).await.not_found(not_found)?;
tx.commit().await?;
- Ok(channel.as_ref().and_then(History::as_snapshot))
+ channel.as_snapshot().ok_or_else(not_found)
}
pub async fn delete(&self, channel: &Id, deleted_at: &DateTime) -> Result<(), Error> {
diff --git a/src/channel/routes/channel/delete.rs b/src/channel/routes/channel/delete.rs
index 91eb506..2d2b5f1 100644
--- a/src/channel/routes/channel/delete.rs
+++ b/src/channel/routes/channel/delete.rs
@@ -1,12 +1,12 @@
use axum::{
- extract::{Path, State},
+ extract::{Json, Path, State},
http::StatusCode,
- response::{IntoResponse, Response},
+ response::{self, IntoResponse},
};
use crate::{
app::App,
- channel::app,
+ channel::{self, app},
clock::RequestedAt,
error::{Internal, NotFound},
token::extract::Identity,
@@ -17,10 +17,21 @@ pub async fn handler(
Path(channel): Path<super::PathInfo>,
RequestedAt(deleted_at): RequestedAt,
_: Identity,
-) -> Result<StatusCode, Error> {
+) -> Result<Response, Error> {
app.channels().delete(&channel, &deleted_at).await?;
- Ok(StatusCode::ACCEPTED)
+ Ok(Response { id: channel })
+}
+
+#[derive(Debug, serde::Serialize)]
+pub struct Response {
+ pub id: channel::Id,
+}
+
+impl IntoResponse for Response {
+ fn into_response(self) -> response::Response {
+ (StatusCode::ACCEPTED, Json(self)).into_response()
+ }
}
#[derive(Debug, thiserror::Error)]
@@ -28,7 +39,7 @@ pub async fn handler(
pub struct Error(#[from] pub app::Error);
impl IntoResponse for Error {
- fn into_response(self) -> Response {
+ fn into_response(self) -> response::Response {
let Self(error) = self;
#[allow(clippy::match_wildcard_for_single_variants)]
match error {
diff --git a/src/channel/routes/channel/test/delete.rs b/src/channel/routes/channel/test/delete.rs
index e9af12f..0371b0a 100644
--- a/src/channel/routes/channel/test/delete.rs
+++ b/src/channel/routes/channel/test/delete.rs
@@ -1,7 +1,4 @@
-use axum::{
- extract::{Path, State},
- http::StatusCode,
-};
+use axum::extract::{Path, State};
use crate::{
channel::{app, routes::channel::delete},
@@ -9,7 +6,7 @@ use crate::{
};
#[tokio::test]
-pub async fn delete_channel() {
+pub async fn valid_channel() {
// Set up the environment
let app = fixtures::scratch_app().await;
@@ -29,7 +26,7 @@ pub async fn delete_channel() {
// Verify the response
- assert_eq!(response, StatusCode::ACCEPTED);
+ assert_eq!(channel.id, response.id);
// Verify the semantics
@@ -38,7 +35,7 @@ pub async fn delete_channel() {
}
#[tokio::test]
-pub async fn delete_invalid_channel_id() {
+pub async fn invalid_channel_id() {
// Set up the environment
let app = fixtures::scratch_app().await;
@@ -62,7 +59,7 @@ pub async fn delete_invalid_channel_id() {
}
#[tokio::test]
-pub async fn delete_deleted() {
+pub async fn channel_deleted() {
// Set up the environment
let app = fixtures::scratch_app().await;
@@ -91,7 +88,7 @@ pub async fn delete_deleted() {
}
#[tokio::test]
-pub async fn delete_expired() {
+pub async fn channel_expired() {
// Set up the environment
let app = fixtures::scratch_app().await;
@@ -120,7 +117,7 @@ pub async fn delete_expired() {
}
#[tokio::test]
-pub async fn delete_purged() {
+pub async fn channel_purged() {
// Set up the environment
let app = fixtures::scratch_app().await;
diff --git a/src/channel/routes/channel/test/post.rs b/src/channel/routes/channel/test/post.rs
index 67e7d36..111a703 100644
--- a/src/channel/routes/channel/test/post.rs
+++ b/src/channel/routes/channel/test/post.rs
@@ -1,11 +1,11 @@
use axum::extract::{Json, Path, State};
-use futures::stream::StreamExt;
+use futures::stream::{self, StreamExt as _};
use crate::{
channel::{self, routes::channel::post},
event::Sequenced,
- message::{self, app::SendError},
- test::fixtures::{self, future::Immediately as _},
+ message::app::SendError,
+ test::fixtures::{self, future::Expect as _},
};
#[tokio::test]
@@ -39,24 +39,23 @@ async fn messages_in_order() {
// Verify the semantics
- let events = app
+ let mut events = app
.events()
.subscribe(None)
.await
.expect("subscribing to a valid channel succeeds")
- .filter_map(fixtures::message::events)
- .take(requests.len());
+ .filter_map(fixtures::event::message)
+ .filter_map(fixtures::event::message::sent)
+ .zip(stream::iter(requests));
- let events = events.collect::<Vec<_>>().immediately().await;
-
- for ((sent_at, message), event) in requests.into_iter().zip(events) {
+ while let Some((event, (sent_at, body))) = events
+ .next()
+ .expect_ready("an event should be ready for each message")
+ .await
+ {
assert_eq!(*sent_at, event.at());
- assert!(matches!(
- event,
- message::Event::Sent(event)
- if event.message.sender == sender.login.id
- && event.message.body == message
- ));
+ assert_eq!(sender.login.id, event.message.sender);
+ assert_eq!(body, event.message.body);
}
}
diff --git a/src/channel/routes/test.rs b/src/channel/routes/test.rs
index 216eba1..10b1e8d 100644
--- a/src/channel/routes/test.rs
+++ b/src/channel/routes/test.rs
@@ -7,7 +7,7 @@ use super::post;
use crate::{
channel::app,
name::Name,
- test::fixtures::{self, future::Immediately as _},
+ test::fixtures::{self, future::Expect as _},
};
#[tokio::test]
@@ -39,7 +39,6 @@ async fn new_channel() {
.channels()
.get(&response.id)
.await
- .expect("searching for channels by ID never fails")
.expect("the newly-created channel exists");
assert_eq!(response, channel);
@@ -48,15 +47,11 @@ async fn new_channel() {
.subscribe(None)
.await
.expect("subscribing never fails")
- .filter_map(fixtures::channel::events)
- .filter_map(fixtures::channel::created)
+ .filter_map(fixtures::event::channel)
+ .filter_map(fixtures::event::channel::created)
.filter(|event| future::ready(event.channel == response));
- let event = events
- .next()
- .immediately()
- .await
- .expect("creation event published");
+ let event = events.next().expect_some("creation event published").await;
assert_eq!(event.channel, response);
}
@@ -165,7 +160,6 @@ async fn name_reusable_after_delete() {
.channels()
.get(&response.id)
.await
- .expect("searching for channels by ID never fails")
.expect("the newly-created channel exists");
assert_eq!(response, channel);
}
@@ -215,7 +209,6 @@ async fn name_reusable_after_expiry() {
.channels()
.get(&response.id)
.await
- .expect("searching for channels by ID never fails")
.expect("the newly-created channel exists");
assert_eq!(response, channel);
}
diff --git a/src/clock.rs b/src/clock.rs
index 9ffef82..242bcdf 100644
--- a/src/clock.rs
+++ b/src/clock.rs
@@ -12,7 +12,7 @@ pub type DateTime = chrono::DateTime<chrono::Utc>;
// calculated once per request, even if the extractor is used in multiple
// places. This requires the [middleware] function to be installed with
// [axum::middleware::from_fn] around the current route.
-#[derive(Clone)]
+#[derive(Debug, Clone)]
pub struct RequestedAt(pub DateTime);
impl RequestedAt {
diff --git a/src/event/routes/test.rs b/src/event/routes/test.rs
deleted file mode 100644
index 49f8094..0000000
--- a/src/event/routes/test.rs
+++ /dev/null
@@ -1,459 +0,0 @@
-use axum::extract::State;
-use axum_extra::extract::Query;
-use futures::{
- future,
- stream::{self, StreamExt as _},
-};
-
-use super::get;
-use crate::{
- event::Sequenced as _,
- test::fixtures::{self, future::Immediately as _},
-};
-
-#[tokio::test]
-async fn includes_historical_message() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let sender = fixtures::login::create(&app, &fixtures::now()).await;
- let channel = fixtures::channel::create(&app, &fixtures::now()).await;
- let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await;
-
- // Call the endpoint
-
- let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) = get::handler(State(app), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
-
- // Verify the structure of the response.
-
- let event = events
- .filter_map(fixtures::message::events)
- .next()
- .immediately()
- .await
- .expect("delivered stored message");
-
- assert!(fixtures::event::message_sent(&event, &message));
-}
-
-#[tokio::test]
-async fn includes_live_message() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let channel = fixtures::channel::create(&app, &fixtures::now()).await;
-
- // Call the endpoint
-
- let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) =
- get::handler(State(app.clone()), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
-
- // Verify the semantics
-
- let sender = fixtures::login::create(&app, &fixtures::now()).await;
- let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await;
-
- let event = events
- .filter_map(fixtures::message::events)
- .next()
- .immediately()
- .await
- .expect("delivered live message");
-
- assert!(fixtures::event::message_sent(&event, &message));
-}
-
-#[tokio::test]
-async fn includes_multiple_channels() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let sender = fixtures::login::create(&app, &fixtures::now()).await;
-
- let channels = [
- fixtures::channel::create(&app, &fixtures::now()).await,
- fixtures::channel::create(&app, &fixtures::now()).await,
- ];
-
- let messages = stream::iter(channels)
- .then(|channel| {
- let app = app.clone();
- let sender = sender.clone();
- let channel = channel.clone();
- async move { fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await }
- })
- .collect::<Vec<_>>()
- .await;
-
- // Call the endpoint
-
- let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) = get::handler(State(app), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
-
- // Verify the structure of the response.
-
- let events = events
- .filter_map(fixtures::message::events)
- .take(messages.len())
- .collect::<Vec<_>>()
- .immediately()
- .await;
-
- for message in &messages {
- assert!(events
- .iter()
- .any(|event| fixtures::event::message_sent(event, message)));
- }
-}
-
-#[tokio::test]
-async fn sequential_messages() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let channel = fixtures::channel::create(&app, &fixtures::now()).await;
- let sender = fixtures::login::create(&app, &fixtures::now()).await;
-
- let messages = vec![
- fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
- fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
- fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
- ];
-
- // Call the endpoint
-
- let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) = get::handler(State(app), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
-
- // Verify the structure of the response.
-
- let mut events = events
- .filter_map(fixtures::message::events)
- .filter(|event| {
- future::ready(
- messages
- .iter()
- .any(|message| fixtures::event::message_sent(event, message)),
- )
- });
-
- // Verify delivery in order
- for message in &messages {
- let event = events
- .next()
- .immediately()
- .await
- .expect("undelivered messages remaining");
-
- assert!(fixtures::event::message_sent(&event, message));
- }
-}
-
-#[tokio::test]
-async fn resumes_from() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let channel = fixtures::channel::create(&app, &fixtures::now()).await;
- let sender = fixtures::login::create(&app, &fixtures::now()).await;
-
- let initial_message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await;
-
- let later_messages = vec![
- fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
- fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
- ];
-
- // Call the endpoint
-
- let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
-
- let resume_at = {
- // First subscription
- let get::Response(events) = get::handler(
- State(app.clone()),
- subscriber.clone(),
- None,
- Query::default(),
- )
- .await
- .expect("subscribe never fails");
-
- let event = events
- .filter_map(fixtures::message::events)
- .next()
- .immediately()
- .await
- .expect("delivered events");
-
- assert!(fixtures::event::message_sent(&event, &initial_message));
-
- event.sequence()
- };
-
- // Resume after disconnect
- let get::Response(resumed) = get::handler(
- State(app),
- subscriber,
- Some(resume_at.into()),
- Query::default(),
- )
- .await
- .expect("subscribe never fails");
-
- // Verify the structure of the response.
-
- let events = resumed
- .filter_map(fixtures::message::events)
- .take(later_messages.len())
- .collect::<Vec<_>>()
- .immediately()
- .await;
-
- for message in &later_messages {
- assert!(events
- .iter()
- .any(|event| fixtures::event::message_sent(event, message)));
- }
-}
-
-// This test verifies a real bug I hit developing the vector-of-sequences
-// approach to resuming events. A small omission caused the event IDs in a
-// resumed stream to _omit_ channels that were in the original stream until
-// those channels also appeared in the resumed stream.
-//
-// Clients would see something like
-// * In the original stream, Cfoo=5,Cbar=8
-// * In the resumed stream, Cfoo=6 (no Cbar sequence number)
-//
-// Disconnecting and reconnecting a second time, using event IDs from that
-// initial period of the first resume attempt, would then cause the second
-// resume attempt to restart all other channels from the beginning, and not
-// from where the first disconnection happened.
-//
-// This is a real and valid behaviour for clients!
-#[tokio::test]
-async fn serial_resume() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let sender = fixtures::login::create(&app, &fixtures::now()).await;
- let channel_a = fixtures::channel::create(&app, &fixtures::now()).await;
- let channel_b = fixtures::channel::create(&app, &fixtures::now()).await;
-
- // Call the endpoint
-
- let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
-
- let resume_at = {
- let initial_messages = [
- fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await,
- fixtures::message::send(&app, &channel_b, &sender, &fixtures::now()).await,
- ];
-
- // First subscription
- let get::Response(events) = get::handler(
- State(app.clone()),
- subscriber.clone(),
- None,
- Query::default(),
- )
- .await
- .expect("subscribe never fails");
-
- let events = events
- .filter_map(fixtures::message::events)
- .take(initial_messages.len())
- .collect::<Vec<_>>()
- .immediately()
- .await;
-
- for message in &initial_messages {
- assert!(events
- .iter()
- .any(|event| fixtures::event::message_sent(event, message)));
- }
-
- let event = events.last().expect("this vec is non-empty");
-
- event.sequence()
- };
-
- // Resume after disconnect
- let resume_at = {
- let resume_messages = [
- // Note that channel_b does not appear here. The buggy behaviour
- // would be masked if channel_b happened to send a new message
- // into the resumed event stream.
- fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await,
- fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await,
- ];
-
- // Second subscription
- let get::Response(events) = get::handler(
- State(app.clone()),
- subscriber.clone(),
- Some(resume_at.into()),
- Query::default(),
- )
- .await
- .expect("subscribe never fails");
-
- let events = events
- .filter_map(fixtures::message::events)
- .take(resume_messages.len())
- .collect::<Vec<_>>()
- .immediately()
- .await;
-
- for message in &resume_messages {
- assert!(events
- .iter()
- .any(|event| fixtures::event::message_sent(event, message)));
- }
-
- let event = events.last().expect("this vec is non-empty");
-
- event.sequence()
- };
-
- // Resume after disconnect a second time
- {
- // At this point, we can send on either channel and demonstrate the
- // problem. The resume point should before both of these messages, but
- // after _all_ prior messages.
- let final_messages = [
- fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await,
- fixtures::message::send(&app, &channel_b, &sender, &fixtures::now()).await,
- ];
-
- // Third subscription
- let get::Response(events) = get::handler(
- State(app.clone()),
- subscriber.clone(),
- Some(resume_at.into()),
- Query::default(),
- )
- .await
- .expect("subscribe never fails");
-
- let events = events
- .filter_map(fixtures::message::events)
- .take(final_messages.len())
- .collect::<Vec<_>>()
- .immediately()
- .await;
-
- // This set of messages, in particular, _should not_ include any prior
- // messages from `initial_messages` or `resume_messages`.
- for message in &final_messages {
- assert!(events
- .iter()
- .any(|event| fixtures::event::message_sent(event, message)));
- }
- };
-}
-
-#[tokio::test]
-async fn terminates_on_token_expiry() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let channel = fixtures::channel::create(&app, &fixtures::now()).await;
- let sender = fixtures::login::create(&app, &fixtures::now()).await;
-
- // Subscribe via the endpoint
-
- let subscriber_creds = fixtures::login::create_with_password(&app, &fixtures::now()).await;
- let subscriber =
- fixtures::identity::logged_in(&app, &subscriber_creds, &fixtures::ancient()).await;
-
- let get::Response(events) =
- get::handler(State(app.clone()), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
-
- // Verify the resulting stream's behaviour
-
- app.tokens()
- .expire(&fixtures::now())
- .await
- .expect("expiring tokens succeeds");
-
- // These should not be delivered.
- let messages = [
- fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
- fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
- fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
- ];
-
- assert!(events
- .filter_map(fixtures::message::events)
- .filter(|event| future::ready(
- messages
- .iter()
- .any(|message| fixtures::event::message_sent(event, message))
- ))
- .next()
- .immediately()
- .await
- .is_none());
-}
-
-#[tokio::test]
-async fn terminates_on_logout() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let channel = fixtures::channel::create(&app, &fixtures::now()).await;
- let sender = fixtures::login::create(&app, &fixtures::now()).await;
-
- // Subscribe via the endpoint
-
- let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
-
- let get::Response(events) = get::handler(
- State(app.clone()),
- subscriber.clone(),
- None,
- Query::default(),
- )
- .await
- .expect("subscribe never fails");
-
- // Verify the resulting stream's behaviour
-
- app.tokens()
- .logout(&subscriber.token)
- .await
- .expect("expiring tokens succeeds");
-
- // These should not be delivered.
- let messages = [
- fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
- fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
- fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
- ];
-
- assert!(events
- .filter_map(fixtures::message::events)
- .filter(|event| future::ready(
- messages
- .iter()
- .any(|message| fixtures::event::message_sent(event, message))
- ))
- .next()
- .immediately()
- .await
- .is_none());
-}
diff --git a/src/event/routes/test/channel.rs b/src/event/routes/test/channel.rs
new file mode 100644
index 0000000..6a0a803
--- /dev/null
+++ b/src/event/routes/test/channel.rs
@@ -0,0 +1,241 @@
+use axum::extract::State;
+use axum_extra::extract::Query;
+use futures::{future, stream::StreamExt as _};
+
+use crate::{
+ event::routes::get,
+ test::fixtures::{self, future::Expect as _},
+};
+
+#[tokio::test]
+async fn creating() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+
+ // Subscribe
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let get::Response(events) =
+ get::handler(State(app.clone()), subscriber, None, Query::default())
+ .await
+ .expect("subscribe never fails");
+
+ // Create a channel
+
+ let name = fixtures::channel::propose();
+ let channel = app
+ .channels()
+ .create(&name, &fixtures::now())
+ .await
+ .expect("creating a channel succeeds");
+
+ // Verify channel created event
+
+ events
+ .filter_map(fixtures::event::channel)
+ .filter_map(fixtures::event::channel::created)
+ .filter(|event| future::ready(event.channel == channel))
+ .next()
+ .expect_some("channel created event is delivered")
+ .await;
+}
+
+#[tokio::test]
+async fn previously_created() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+
+ // Create a channel
+
+ let name = fixtures::channel::propose();
+ let channel = app
+ .channels()
+ .create(&name, &fixtures::now())
+ .await
+ .expect("creating a channel succeeds");
+
+ // Subscribe
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let get::Response(events) =
+ get::handler(State(app.clone()), subscriber, None, Query::default())
+ .await
+ .expect("subscribe never fails");
+
+ // Verify channel created event
+
+ let _ = events
+ .filter_map(fixtures::event::channel)
+ .filter_map(fixtures::event::channel::created)
+ .filter(|event| future::ready(event.channel == channel))
+ .next()
+ .expect_some("channel created event is delivered")
+ .await;
+}
+
+#[tokio::test]
+async fn expiring() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::ancient()).await;
+
+ // Subscribe
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let get::Response(events) =
+ get::handler(State(app.clone()), subscriber, None, Query::default())
+ .await
+ .expect("subscribe never fails");
+
+ // Expire channels
+
+ app.channels()
+ .expire(&fixtures::now())
+ .await
+ .expect("expiring channels always succeeds");
+
+ // Check for expiry event
+ let _ = events
+ .filter_map(fixtures::event::channel)
+ .filter_map(fixtures::event::channel::deleted)
+ .filter(|event| future::ready(event.id == channel.id))
+ .next()
+ .expect_some("a deleted channel event will be delivered")
+ .await;
+}
+
+#[tokio::test]
+async fn previously_expired() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::ancient()).await;
+
+ // Expire channels
+
+ app.channels()
+ .expire(&fixtures::now())
+ .await
+ .expect("expiring channels always succeeds");
+
+ // Subscribe
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let get::Response(events) =
+ get::handler(State(app.clone()), subscriber, None, Query::default())
+ .await
+ .expect("subscribe never fails");
+
+ // Check for expiry event
+ let _ = events
+ .filter_map(fixtures::event::channel)
+ .filter_map(fixtures::event::channel::deleted)
+ .filter(|event| future::ready(event.id == channel.id))
+ .next()
+ .expect_some("a deleted channel event will be delivered")
+ .await;
+}
+
+#[tokio::test]
+async fn deleting() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+
+ // Subscribe
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let get::Response(events) =
+ get::handler(State(app.clone()), subscriber, None, Query::default())
+ .await
+ .expect("subscribe never fails");
+
+ // Delete the channel
+
+ app.channels()
+ .delete(&channel.id, &fixtures::now())
+ .await
+ .expect("deleting a valid channel succeeds");
+
+ // Check for delete event
+ let _ = events
+ .filter_map(fixtures::event::channel)
+ .filter_map(fixtures::event::channel::deleted)
+ .filter(|event| future::ready(event.id == channel.id))
+ .next()
+ .expect_some("a deleted channel event will be delivered")
+ .await;
+}
+
+#[tokio::test]
+async fn previously_deleted() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+
+ // Delete the channel
+
+ app.channels()
+ .delete(&channel.id, &fixtures::now())
+ .await
+ .expect("deleting a valid channel succeeds");
+
+ // Subscribe
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let get::Response(events) =
+ get::handler(State(app.clone()), subscriber, None, Query::default())
+ .await
+ .expect("subscribe never fails");
+
+ // Check for expiry event
+ let _ = events
+ .filter_map(fixtures::event::channel)
+ .filter_map(fixtures::event::channel::deleted)
+ .filter(|event| future::ready(event.id == channel.id))
+ .next()
+ .expect_some("a deleted channel event will be delivered")
+ .await;
+}
+
+#[tokio::test]
+async fn previously_purged() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::ancient()).await;
+
+ // Delete and purge the channel
+
+ app.channels()
+ .delete(&channel.id, &fixtures::ancient())
+ .await
+ .expect("deleting a valid channel succeeds");
+
+ app.channels()
+ .purge(&fixtures::now())
+ .await
+ .expect("purging channels always succeeds");
+
+ // Subscribe
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let get::Response(events) =
+ get::handler(State(app.clone()), subscriber, None, Query::default())
+ .await
+ .expect("subscribe never fails");
+
+ // Check for expiry event
+ events
+ .filter_map(fixtures::event::channel)
+ .filter_map(fixtures::event::channel::deleted)
+ .filter(|event| future::ready(event.id == channel.id))
+ .next()
+ .expect_wait("deleted channel events not delivered")
+ .await;
+}
diff --git a/src/event/routes/test/invite.rs b/src/event/routes/test/invite.rs
new file mode 100644
index 0000000..d24f474
--- /dev/null
+++ b/src/event/routes/test/invite.rs
@@ -0,0 +1,80 @@
+use axum::extract::State;
+use axum_extra::extract::Query;
+use futures::{future, stream::StreamExt as _};
+
+use crate::{
+ event::routes::get,
+ test::fixtures::{self, future::Expect as _},
+};
+
+#[tokio::test]
+async fn accepting_invite() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let issuer = fixtures::login::create(&app, &fixtures::now()).await;
+ let invite = fixtures::invite::issue(&app, &issuer, &fixtures::now()).await;
+
+ // Subscribe
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let get::Response(events) =
+ get::handler(State(app.clone()), subscriber, None, Query::default())
+ .await
+ .expect("subscribe never fails");
+
+ // Accept the invite
+
+ let (name, password) = fixtures::login::propose();
+ let (joiner, _) = app
+ .invites()
+ .accept(&invite.id, &name, &password, &fixtures::now())
+ .await
+ .expect("accepting an invite succeeds");
+
+ // Expect a login created event
+
+ let _ = events
+ .filter_map(fixtures::event::login)
+ .filter_map(fixtures::event::login::created)
+ .filter(|event| future::ready(event.login == joiner))
+ .next()
+ .expect_some("a login created event is sent")
+ .await;
+}
+
+#[tokio::test]
+async fn previously_accepted_invite() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let issuer = fixtures::login::create(&app, &fixtures::now()).await;
+ let invite = fixtures::invite::issue(&app, &issuer, &fixtures::now()).await;
+
+ // Accept the invite
+
+ let (name, password) = fixtures::login::propose();
+ let (joiner, _) = app
+ .invites()
+ .accept(&invite.id, &name, &password, &fixtures::now())
+ .await
+ .expect("accepting an invite succeeds");
+
+ // Subscribe
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let get::Response(events) =
+ get::handler(State(app.clone()), subscriber, None, Query::default())
+ .await
+ .expect("subscribe never fails");
+
+ // Expect a login created event
+
+ let _ = events
+ .filter_map(fixtures::event::login)
+ .filter_map(fixtures::event::login::created)
+ .filter(|event| future::ready(event.login == joiner))
+ .next()
+ .expect_some("a login created event is sent")
+ .await;
+}
diff --git a/src/event/routes/test/message.rs b/src/event/routes/test/message.rs
new file mode 100644
index 0000000..63a3f43
--- /dev/null
+++ b/src/event/routes/test/message.rs
@@ -0,0 +1,349 @@
+use axum::extract::State;
+use axum_extra::extract::Query;
+use futures::{
+ future,
+ stream::{self, StreamExt as _},
+};
+
+use crate::{
+ event::routes::get,
+ test::fixtures::{self, future::Expect as _},
+};
+
+#[tokio::test]
+async fn sending() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+
+ // Call the endpoint
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let get::Response(events) =
+ get::handler(State(app.clone()), subscriber, None, Query::default())
+ .await
+ .expect("subscribe never fails");
+
+ // Send a message
+
+ let sender = fixtures::login::create(&app, &fixtures::now()).await;
+ let message = app
+ .messages()
+ .send(
+ &channel.id,
+ &sender,
+ &fixtures::now(),
+ &fixtures::message::propose(),
+ )
+ .await
+ .expect("sending a message succeeds");
+
+ // Verify that an event is delivered
+
+ let _ = events
+ .filter_map(fixtures::event::message)
+ .filter_map(fixtures::event::message::sent)
+ .filter(|event| future::ready(event.message == message))
+ .next()
+ .expect_some("delivered message sent event")
+ .await;
+}
+
+#[tokio::test]
+async fn previously_sent() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+
+ // Send a message
+
+ let sender = fixtures::login::create(&app, &fixtures::now()).await;
+ let message = app
+ .messages()
+ .send(
+ &channel.id,
+ &sender,
+ &fixtures::now(),
+ &fixtures::message::propose(),
+ )
+ .await
+ .expect("sending a message succeeds");
+
+ // Call the endpoint
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let get::Response(events) =
+ get::handler(State(app.clone()), subscriber, None, Query::default())
+ .await
+ .expect("subscribe never fails");
+
+ // Verify that an event is delivered
+
+ let _ = events
+ .filter_map(fixtures::event::message)
+ .filter_map(fixtures::event::message::sent)
+ .filter(|event| future::ready(event.message == message))
+ .next()
+ .expect_some("delivered message sent event")
+ .await;
+}
+
+#[tokio::test]
+async fn sent_in_multiple_channels() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let sender = fixtures::login::create(&app, &fixtures::now()).await;
+
+ let channels = [
+ fixtures::channel::create(&app, &fixtures::now()).await,
+ fixtures::channel::create(&app, &fixtures::now()).await,
+ ];
+
+ let messages = stream::iter(channels)
+ .then(|channel| {
+ let app = app.clone();
+ let sender = sender.clone();
+ let channel = channel.clone();
+ async move { fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await }
+ })
+ .collect::<Vec<_>>()
+ .await;
+
+ // Call the endpoint
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let get::Response(events) = get::handler(State(app), subscriber, None, Query::default())
+ .await
+ .expect("subscribe never fails");
+
+ // Verify the structure of the response.
+
+ let events = events
+ .filter_map(fixtures::event::message)
+ .filter_map(fixtures::event::message::sent)
+ .take(messages.len())
+ .collect::<Vec<_>>()
+ .expect_ready("events ready")
+ .await;
+
+ for message in &messages {
+ assert!(events.iter().any(|event| &event.message == message));
+ }
+}
+
+#[tokio::test]
+async fn sent_sequentially() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+ let sender = fixtures::login::create(&app, &fixtures::now()).await;
+
+ let messages = vec![
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ ];
+
+ // Subscribe
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let get::Response(events) = get::handler(State(app), subscriber, None, Query::default())
+ .await
+ .expect("subscribe never fails");
+
+ // Verify the expected events in the expected order
+
+ let mut events = events
+ .filter_map(fixtures::event::message)
+ .filter_map(fixtures::event::message::sent)
+ .filter(|event| future::ready(messages.iter().any(|message| &event.message == message)));
+
+ for message in &messages {
+ let event = events
+ .next()
+ .expect_some("undelivered messages remaining")
+ .await;
+
+ assert_eq!(message, &event.message);
+ }
+}
+
+#[tokio::test]
+async fn expiring() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::ancient()).await;
+ let sender = fixtures::login::create(&app, &fixtures::ancient()).await;
+ let message = fixtures::message::send(&app, &channel, &sender, &fixtures::ancient()).await;
+
+ // Subscribe
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let get::Response(events) =
+ get::handler(State(app.clone()), subscriber, None, Query::default())
+ .await
+ .expect("subscribe never fails");
+
+ // Expire messages
+
+ app.messages()
+ .expire(&fixtures::now())
+ .await
+ .expect("expiring messages always succeeds");
+
+ // Check for expiry event
+ let _ = events
+ .filter_map(fixtures::event::message)
+ .filter_map(fixtures::event::message::deleted)
+ .filter(|event| future::ready(event.id == message.id))
+ .next()
+ .expect_some("a deleted message event will be delivered")
+ .await;
+}
+
+#[tokio::test]
+async fn previously_expired() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::ancient()).await;
+ let sender = fixtures::login::create(&app, &fixtures::ancient()).await;
+ let message = fixtures::message::send(&app, &channel, &sender, &fixtures::ancient()).await;
+
+ // Expire messages
+
+ app.messages()
+ .expire(&fixtures::now())
+ .await
+ .expect("expiring messages always succeeds");
+
+ // Subscribe
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let get::Response(events) =
+ get::handler(State(app.clone()), subscriber, None, Query::default())
+ .await
+ .expect("subscribe never fails");
+
+ // Check for expiry event
+ let _ = events
+ .filter_map(fixtures::event::message)
+ .filter_map(fixtures::event::message::deleted)
+ .filter(|event| future::ready(event.id == message.id))
+ .next()
+ .expect_some("a deleted message event will be delivered")
+ .await;
+}
+
+#[tokio::test]
+async fn deleting() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+ let sender = fixtures::login::create(&app, &fixtures::now()).await;
+ let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await;
+
+ // Subscribe
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let get::Response(events) =
+ get::handler(State(app.clone()), subscriber, None, Query::default())
+ .await
+ .expect("subscribe never fails");
+
+ // Delete the message
+
+ app.messages()
+ .delete(&message.id, &fixtures::now())
+ .await
+ .expect("deleting a valid message succeeds");
+
+ // Check for delete event
+ let _ = events
+ .filter_map(fixtures::event::message)
+ .filter_map(fixtures::event::message::deleted)
+ .filter(|event| future::ready(event.id == message.id))
+ .next()
+ .expect_some("a deleted message event will be delivered")
+ .await;
+}
+
+#[tokio::test]
+async fn previously_deleted() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+ let sender = fixtures::login::create(&app, &fixtures::now()).await;
+ let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await;
+
+ // Delete the message
+
+ app.messages()
+ .delete(&message.id, &fixtures::now())
+ .await
+ .expect("deleting a valid message succeeds");
+
+ // Subscribe
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let get::Response(events) =
+ get::handler(State(app.clone()), subscriber, None, Query::default())
+ .await
+ .expect("subscribe never fails");
+
+ // Check for delete event
+ let _ = events
+ .filter_map(fixtures::event::message)
+ .filter_map(fixtures::event::message::deleted)
+ .filter(|event| future::ready(event.id == message.id))
+ .next()
+ .expect_some("a deleted message event will be delivered")
+ .await;
+}
+
+#[tokio::test]
+async fn previously_purged() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::ancient()).await;
+ let sender = fixtures::login::create(&app, &fixtures::ancient()).await;
+ let message = fixtures::message::send(&app, &channel, &sender, &fixtures::ancient()).await;
+
+ // Purge the message
+
+ app.messages()
+ .delete(&message.id, &fixtures::ancient())
+ .await
+ .expect("deleting a valid message succeeds");
+
+ app.messages()
+ .purge(&fixtures::now())
+ .await
+ .expect("purge always succeeds");
+
+ // Subscribe
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let get::Response(events) =
+ get::handler(State(app.clone()), subscriber, None, Query::default())
+ .await
+ .expect("subscribe never fails");
+
+ // Check for delete event
+
+ events
+ .filter_map(fixtures::event::message)
+ .filter_map(fixtures::event::message::deleted)
+ .filter(|event| future::ready(event.id == message.id))
+ .next()
+ .expect_wait("no deleted message will be delivered")
+ .await;
+}
diff --git a/src/event/routes/test/mod.rs b/src/event/routes/test/mod.rs
new file mode 100644
index 0000000..e7e35f1
--- /dev/null
+++ b/src/event/routes/test/mod.rs
@@ -0,0 +1,6 @@
+mod channel;
+mod invite;
+mod message;
+mod resume;
+mod setup;
+mod token;
diff --git a/src/event/routes/test/resume.rs b/src/event/routes/test/resume.rs
new file mode 100644
index 0000000..62b9bad
--- /dev/null
+++ b/src/event/routes/test/resume.rs
@@ -0,0 +1,219 @@
+use std::future;
+
+use axum::extract::State;
+use axum_extra::extract::Query;
+use futures::stream::{self, StreamExt as _};
+
+use crate::{
+ event::{routes::get, Sequenced as _},
+ test::fixtures::{self, future::Expect as _},
+};
+
+#[tokio::test]
+async fn resume() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+ let sender = fixtures::login::create(&app, &fixtures::now()).await;
+
+ let initial_message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await;
+
+ let later_messages = vec![
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ ];
+
+ // Call the endpoint
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+
+ let resume_at = {
+ // First subscription
+ let get::Response(events) = get::handler(
+ State(app.clone()),
+ subscriber.clone(),
+ None,
+ Query::default(),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ let event = events
+ .filter_map(fixtures::event::message)
+ .filter_map(fixtures::event::message::sent)
+ .filter(|event| future::ready(event.message == initial_message))
+ .next()
+ .expect_some("delivered event for initial message")
+ .await;
+
+ event.sequence()
+ };
+
+ // Resume after disconnect
+ let get::Response(resumed) = get::handler(
+ State(app),
+ subscriber,
+ Some(resume_at.into()),
+ Query::default(),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ // Verify final events
+
+ let mut events = resumed
+ .filter_map(fixtures::event::message)
+ .filter_map(fixtures::event::message::sent)
+ .zip(stream::iter(later_messages));
+
+ while let Some((event, message)) = events.next().expect_ready("event ready").await {
+ assert_eq!(message, event.message);
+ }
+}
+
+// This test verifies a real bug I hit developing the vector-of-sequences
+// approach to resuming events. A small omission caused the event IDs in a
+// resumed stream to _omit_ channels that were in the original stream until
+// those channels also appeared in the resumed stream.
+//
+// Clients would see something like
+// * In the original stream, Cfoo=5,Cbar=8
+// * In the resumed stream, Cfoo=6 (no Cbar sequence number)
+//
+// Disconnecting and reconnecting a second time, using event IDs from that
+// initial period of the first resume attempt, would then cause the second
+// resume attempt to restart all other channels from the beginning, and not
+// from where the first disconnection happened.
+//
+// As we have switched to a single global event sequence number, this scenario
+// can no longer arise, but this test is preserved because the actual behaviour
+// _is_ a valid way for clients to behave, and should work. We might as well
+// keep testing it.
+#[tokio::test]
+async fn serial_resume() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let sender = fixtures::login::create(&app, &fixtures::now()).await;
+ let channel_a = fixtures::channel::create(&app, &fixtures::now()).await;
+ let channel_b = fixtures::channel::create(&app, &fixtures::now()).await;
+
+ // Call the endpoint
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+
+ let resume_at = {
+ let initial_messages = [
+ fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel_b, &sender, &fixtures::now()).await,
+ ];
+
+ // First subscription
+
+ let get::Response(events) = get::handler(
+ State(app.clone()),
+ subscriber.clone(),
+ None,
+ Query::default(),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ // Check for expected events
+
+ let events = events
+ .filter_map(fixtures::event::message)
+ .filter_map(fixtures::event::message::sent)
+ .zip(stream::iter(initial_messages))
+ .collect::<Vec<_>>()
+ .expect_ready("zipping a finite list of events is ready immediately")
+ .await;
+
+ assert!(events
+ .iter()
+ .all(|(event, message)| message == &event.message));
+
+ let (event, _) = events.last().expect("this vec is non-empty");
+
+ // Take the last one's resume point
+
+ event.sequence()
+ };
+
+ // Resume after disconnect
+ let resume_at = {
+ let resume_messages = [
+ // Note that channel_b does not appear here. The buggy behaviour
+ // would be masked if channel_b happened to send a new message
+ // into the resumed event stream.
+ fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await,
+ ];
+
+ // Second subscription
+ let get::Response(events) = get::handler(
+ State(app.clone()),
+ subscriber.clone(),
+ Some(resume_at.into()),
+ Query::default(),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ // Check for expected events
+
+ let events = events
+ .filter_map(fixtures::event::message)
+ .filter_map(fixtures::event::message::sent)
+ .zip(stream::iter(resume_messages))
+ .collect::<Vec<_>>()
+ .expect_ready("zipping a finite list of events is ready immediately")
+ .await;
+
+ assert!(events
+ .iter()
+ .all(|(event, message)| message == &event.message));
+
+ let (event, _) = events.last().expect("this vec is non-empty");
+
+ // Take the last one's resume point
+
+ event.sequence()
+ };
+
+ // Resume after disconnect a second time
+ {
+ // At this point, we can send on either channel and demonstrate the
+ // problem. The resume point should before both of these messages, but
+ // after _all_ prior messages.
+ let final_messages = [
+ fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel_b, &sender, &fixtures::now()).await,
+ ];
+
+ // Third subscription
+ let get::Response(events) = get::handler(
+ State(app.clone()),
+ subscriber.clone(),
+ Some(resume_at.into()),
+ Query::default(),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ // Check for expected events
+
+ let events = events
+ .filter_map(fixtures::event::message)
+ .filter_map(fixtures::event::message::sent)
+ .zip(stream::iter(final_messages))
+ .collect::<Vec<_>>()
+ .expect_ready("zipping a finite list of events is ready immediately")
+ .await;
+
+ assert!(events
+ .iter()
+ .all(|(event, message)| message == &event.message));
+ };
+}
diff --git a/src/event/routes/test/setup.rs b/src/event/routes/test/setup.rs
new file mode 100644
index 0000000..007b03d
--- /dev/null
+++ b/src/event/routes/test/setup.rs
@@ -0,0 +1,45 @@
+use axum::extract::State;
+use axum_extra::extract::Query;
+use futures::{future, stream::StreamExt as _};
+
+use crate::{
+ event::routes::get,
+ test::fixtures::{self, future::Expect as _},
+};
+
+// There's no test for this in subscribe-then-setup order because creating an
+// identity to subscribe with also completes initial setup, preventing the
+// test from running. That is also a can't-happen scenario in reality.
+#[tokio::test]
+async fn previously_completed() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+
+ // Complete initial setup
+
+ let (name, password) = fixtures::login::propose();
+ let (owner, _) = app
+ .setup()
+ .initial(&name, &password, &fixtures::now())
+ .await
+ .expect("initial setup in an empty app succeeds");
+
+ // Subscribe to events
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let get::Response(events) =
+ get::handler(State(app.clone()), subscriber, None, Query::default())
+ .await
+ .expect("subscribe never fails");
+
+ // Expect a login created event
+
+ let _ = events
+ .filter_map(fixtures::event::login)
+ .filter_map(fixtures::event::login::created)
+ .filter(|event| future::ready(event.login == owner))
+ .next()
+ .expect_some("a login created event is sent")
+ .await;
+}
diff --git a/src/event/routes/test/token.rs b/src/event/routes/test/token.rs
new file mode 100644
index 0000000..2039d9b
--- /dev/null
+++ b/src/event/routes/test/token.rs
@@ -0,0 +1,95 @@
+use axum::extract::State;
+use axum_extra::extract::Query;
+use futures::{future, stream::StreamExt as _};
+
+use crate::{
+ event::routes::get,
+ test::fixtures::{self, future::Expect as _},
+};
+
+#[tokio::test]
+async fn terminates_on_token_expiry() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+ let sender = fixtures::login::create(&app, &fixtures::now()).await;
+
+ // Subscribe via the endpoint
+
+ let subscriber_creds = fixtures::login::create_with_password(&app, &fixtures::now()).await;
+ let subscriber =
+ fixtures::identity::logged_in(&app, &subscriber_creds, &fixtures::ancient()).await;
+
+ let get::Response(events) =
+ get::handler(State(app.clone()), subscriber, None, Query::default())
+ .await
+ .expect("subscribe never fails");
+
+ // Verify the resulting stream's behaviour
+
+ app.tokens()
+ .expire(&fixtures::now())
+ .await
+ .expect("expiring tokens succeeds");
+
+ // These should not be delivered.
+ let messages = [
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ ];
+
+ events
+ .filter_map(fixtures::event::message)
+ .filter_map(fixtures::event::message::sent)
+ .filter(|event| future::ready(messages.iter().any(|message| &event.message == message)))
+ .next()
+ .expect_none("end of stream")
+ .await;
+}
+
+#[tokio::test]
+async fn terminates_on_logout() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+ let sender = fixtures::login::create(&app, &fixtures::now()).await;
+
+ // Subscribe via the endpoint
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+
+ let get::Response(events) = get::handler(
+ State(app.clone()),
+ subscriber.clone(),
+ None,
+ Query::default(),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ // Verify the resulting stream's behaviour
+
+ app.tokens()
+ .logout(&subscriber.token)
+ .await
+ .expect("expiring tokens succeeds");
+
+ // These should not be delivered.
+
+ let messages = [
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ ];
+
+ events
+ .filter_map(fixtures::event::message)
+ .filter_map(fixtures::event::message::sent)
+ .filter(|event| future::ready(messages.iter().any(|message| &event.message == message)))
+ .next()
+ .expect_none("end of stream")
+ .await;
+}
diff --git a/src/invite/app.rs b/src/invite/app.rs
index 64ba753..176075f 100644
--- a/src/invite/app.rs
+++ b/src/invite/app.rs
@@ -5,7 +5,7 @@ use super::{repo::Provider as _, Id, Invite, Summary};
use crate::{
clock::DateTime,
db::{Duplicate as _, NotFound as _},
- event::repo::Provider as _,
+ event::{repo::Provider as _, Broadcaster, Event},
login::{repo::Provider as _, Login, Password},
name::Name,
token::{repo::Provider as _, Secret},
@@ -13,18 +13,15 @@ use crate::{
pub struct Invites<'a> {
db: &'a SqlitePool,
+ events: &'a Broadcaster,
}
impl<'a> Invites<'a> {
- pub const fn new(db: &'a SqlitePool) -> Self {
- Self { db }
+ pub const fn new(db: &'a SqlitePool, events: &'a Broadcaster) -> Self {
+ Self { db, events }
}
- pub async fn create(
- &self,
- issuer: &Login,
- issued_at: &DateTime,
- ) -> Result<Invite, sqlx::Error> {
+ pub async fn issue(&self, issuer: &Login, issued_at: &DateTime) -> Result<Invite, sqlx::Error> {
let mut tx = self.db.begin().await?;
let invite = tx.invites().create(issuer, issued_at).await?;
tx.commit().await?;
@@ -73,6 +70,9 @@ impl<'a> Invites<'a> {
let secret = tx.tokens().issue(&login, accepted_at).await?;
tx.commit().await?;
+ self.events
+ .broadcast(login.events().map(Event::from).collect::<Vec<_>>());
+
Ok((login.as_created(), secret))
}
diff --git a/src/invite/mod.rs b/src/invite/mod.rs
index d59fb9c..53ca984 100644
--- a/src/invite/mod.rs
+++ b/src/invite/mod.rs
@@ -14,7 +14,7 @@ pub struct Invite {
pub issued_at: DateTime,
}
-#[derive(serde::Serialize)]
+#[derive(Debug, serde::Serialize)]
pub struct Summary {
pub id: Id,
pub issuer: nfc::String,
diff --git a/src/invite/routes/invite/mod.rs b/src/invite/routes/invite/mod.rs
index 04593fd..c22029a 100644
--- a/src/invite/routes/invite/mod.rs
+++ b/src/invite/routes/invite/mod.rs
@@ -1,4 +1,6 @@
pub mod get;
pub mod post;
+#[cfg(test)]
+pub mod test;
type PathInfo = crate::invite::Id;
diff --git a/src/invite/routes/invite/post.rs b/src/invite/routes/invite/post.rs
index 3ca4e6b..0dd8dba 100644
--- a/src/invite/routes/invite/post.rs
+++ b/src/invite/routes/invite/post.rs
@@ -36,7 +36,8 @@ pub struct Request {
pub password: Password,
}
-pub struct Error(app::AcceptError);
+#[derive(Debug)]
+pub struct Error(pub app::AcceptError);
impl IntoResponse for Error {
fn into_response(self) -> Response {
diff --git a/src/invite/routes/invite/test/get.rs b/src/invite/routes/invite/test/get.rs
new file mode 100644
index 0000000..c6780ed
--- /dev/null
+++ b/src/invite/routes/invite/test/get.rs
@@ -0,0 +1,65 @@
+use axum::extract::{Json, Path, State};
+
+use crate::{invite::routes::invite::get, test::fixtures};
+
+#[tokio::test]
+async fn valid_invite() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let issuer = fixtures::login::create(&app, &fixtures::now()).await;
+ let invite = fixtures::invite::issue(&app, &issuer, &fixtures::now()).await;
+
+ // Call endpoint
+
+ let Json(response) = get::handler(State(app), Path(invite.id))
+ .await
+ .expect("get for an existing invite succeeds");
+
+ // Verify response
+
+ assert_eq!(issuer.name.display(), &response.issuer);
+ assert_eq!(invite.issued_at, response.issued_at);
+}
+
+#[tokio::test]
+async fn nonexistent_invite() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+
+ // Call endpoint
+
+ let invite = fixtures::invite::fictitious();
+ let error = get::handler(State(app), Path(invite.clone()))
+ .await
+ .expect_err("get for a nonexistent invite fails");
+
+ // Verify response
+
+ assert!(matches!(error, get::Error::NotFound(error_id) if invite == error_id));
+}
+
+#[tokio::test]
+async fn expired_invite() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let issuer = fixtures::login::create(&app, &fixtures::ancient()).await;
+ let invite = fixtures::invite::issue(&app, &issuer, &fixtures::ancient()).await;
+
+ app.invites()
+ .expire(&fixtures::now())
+ .await
+ .expect("expiring invites never fails");
+
+ // Call endpoint
+
+ let error = get::handler(State(app), Path(invite.id.clone()))
+ .await
+ .expect_err("get for an expired invite fails");
+
+ // Verify response
+
+ assert!(matches!(error, get::Error::NotFound(error_id) if invite.id == error_id));
+}
diff --git a/src/invite/routes/invite/test/mod.rs b/src/invite/routes/invite/test/mod.rs
new file mode 100644
index 0000000..d6c1f06
--- /dev/null
+++ b/src/invite/routes/invite/test/mod.rs
@@ -0,0 +1,2 @@
+mod get;
+mod post;
diff --git a/src/invite/routes/invite/test/post.rs b/src/invite/routes/invite/test/post.rs
new file mode 100644
index 0000000..65ab61e
--- /dev/null
+++ b/src/invite/routes/invite/test/post.rs
@@ -0,0 +1,208 @@
+use axum::extract::{Json, Path, State};
+
+use crate::{
+ invite::{app::AcceptError, routes::invite::post},
+ name::Name,
+ test::fixtures,
+};
+
+#[tokio::test]
+async fn valid_invite() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let issuer = fixtures::login::create(&app, &fixtures::now()).await;
+ let invite = fixtures::invite::issue(&app, &issuer, &fixtures::now()).await;
+
+ // Call the endpoint
+
+ let (name, password) = fixtures::login::propose();
+ let identity = fixtures::cookie::not_logged_in();
+ let request = post::Request {
+ name: name.clone(),
+ password: password.clone(),
+ };
+ let (identity, Json(response)) = post::handler(
+ State(app.clone()),
+ fixtures::now(),
+ identity,
+ Path(invite.id),
+ Json(request),
+ )
+ .await
+ .expect("accepting a valid invite succeeds");
+
+ // Verify the response
+
+ assert!(identity.secret().is_some());
+ assert_eq!(name, response.name);
+
+ // Verify that the issued token is valid
+
+ let secret = identity
+ .secret()
+ .expect("newly-issued identity has a token secret");
+ let (_, login) = app
+ .tokens()
+ .validate(&secret, &fixtures::now())
+ .await
+ .expect("newly-issued identity cookie is valid");
+ assert_eq!(response, login);
+
+ // Verify that the given credentials can log in
+
+ let (login, _) = app
+ .tokens()
+ .login(&name, &password, &fixtures::now())
+ .await
+ .expect("credentials given on signup are valid");
+ assert_eq!(response, login);
+}
+
+#[tokio::test]
+async fn nonexistent_invite() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let invite = fixtures::invite::fictitious();
+
+ // Call the endpoint
+
+ let (name, password) = fixtures::login::propose();
+ let identity = fixtures::cookie::not_logged_in();
+ let request = post::Request {
+ name: name.clone(),
+ password: password.clone(),
+ };
+ let post::Error(error) = post::handler(
+ State(app.clone()),
+ fixtures::now(),
+ identity,
+ Path(invite.clone()),
+ Json(request),
+ )
+ .await
+ .expect_err("accepting a nonexistent invite fails");
+
+ // Verify the response
+
+ assert!(matches!(error, AcceptError::NotFound(error_id) if error_id == invite));
+}
+
+#[tokio::test]
+async fn expired_invite() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let issuer = fixtures::login::create(&app, &fixtures::ancient()).await;
+ let invite = fixtures::invite::issue(&app, &issuer, &fixtures::ancient()).await;
+
+ app.invites()
+ .expire(&fixtures::now())
+ .await
+ .expect("expiring invites never fails");
+
+ // Call the endpoint
+
+ let (name, password) = fixtures::login::propose();
+ let identity = fixtures::cookie::not_logged_in();
+ let request = post::Request {
+ name: name.clone(),
+ password: password.clone(),
+ };
+ let post::Error(error) = post::handler(
+ State(app.clone()),
+ fixtures::now(),
+ identity,
+ Path(invite.id.clone()),
+ Json(request),
+ )
+ .await
+ .expect_err("accepting a nonexistent invite fails");
+
+ // Verify the response
+
+ assert!(matches!(error, AcceptError::NotFound(error_id) if error_id == invite.id));
+}
+
+#[tokio::test]
+async fn accepted_invite() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let issuer = fixtures::login::create(&app, &fixtures::ancient()).await;
+ let invite = fixtures::invite::issue(&app, &issuer, &fixtures::ancient()).await;
+
+ let (name, password) = fixtures::login::propose();
+ app.invites()
+ .accept(&invite.id, &name, &password, &fixtures::now())
+ .await
+ .expect("accepting a valid invite succeeds");
+
+ // Call the endpoint
+
+ let (name, password) = fixtures::login::propose();
+ let identity = fixtures::cookie::not_logged_in();
+ let request = post::Request {
+ name: name.clone(),
+ password: password.clone(),
+ };
+ let post::Error(error) = post::handler(
+ State(app.clone()),
+ fixtures::now(),
+ identity,
+ Path(invite.id.clone()),
+ Json(request),
+ )
+ .await
+ .expect_err("accepting a nonexistent invite fails");
+
+ // Verify the response
+
+ assert!(matches!(error, AcceptError::NotFound(error_id) if error_id == invite.id));
+}
+
+#[tokio::test]
+async fn conflicting_name() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let issuer = fixtures::login::create(&app, &fixtures::ancient()).await;
+ let invite = fixtures::invite::issue(&app, &issuer, &fixtures::ancient()).await;
+
+ let existing_name = Name::from("rijksmuseum");
+ app.logins()
+ .create(
+ &existing_name,
+ &fixtures::login::propose_password(),
+ &fixtures::now(),
+ )
+ .await
+ .expect("creating a login in an empty environment succeeds");
+
+ // Call the endpoint
+
+ let conflicting_name = Name::from("r\u{0133}ksmuseum");
+ let password = fixtures::login::propose_password();
+
+ let identity = fixtures::cookie::not_logged_in();
+ let request = post::Request {
+ name: conflicting_name.clone(),
+ password: password.clone(),
+ };
+ let post::Error(error) = post::handler(
+ State(app.clone()),
+ fixtures::now(),
+ identity,
+ Path(invite.id.clone()),
+ Json(request),
+ )
+ .await
+ .expect_err("accepting a nonexistent invite fails");
+
+ // Verify the response
+
+ assert!(
+ matches!(error, AcceptError::DuplicateLogin(error_name) if error_name == conflicting_name)
+ );
+}
diff --git a/src/invite/routes/mod.rs b/src/invite/routes/mod.rs
index dae20ba..2f7375c 100644
--- a/src/invite/routes/mod.rs
+++ b/src/invite/routes/mod.rs
@@ -7,6 +7,8 @@ use crate::app::App;
mod invite;
mod post;
+#[cfg(test)]
+mod test;
pub fn router() -> Router<App> {
Router::new()
diff --git a/src/invite/routes/post.rs b/src/invite/routes/post.rs
index eb7d706..898081e 100644
--- a/src/invite/routes/post.rs
+++ b/src/invite/routes/post.rs
@@ -10,7 +10,7 @@ pub async fn handler(
identity: Identity,
_: Json<Request>,
) -> Result<Json<Invite>, Internal> {
- let invite = app.invites().create(&identity.login, &issued_at).await?;
+ let invite = app.invites().issue(&identity.login, &issued_at).await?;
Ok(Json(invite))
}
diff --git a/src/invite/routes/test.rs b/src/invite/routes/test.rs
new file mode 100644
index 0000000..4d99660
--- /dev/null
+++ b/src/invite/routes/test.rs
@@ -0,0 +1,28 @@
+use axum::extract::{Json, State};
+
+use super::post;
+use crate::test::fixtures;
+
+#[tokio::test]
+async fn create_invite() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let issuer = fixtures::identity::create(&app, &fixtures::now()).await;
+ let issued_at = fixtures::now();
+
+ // Call the endpoint
+
+ let Json(invite) = post::handler(
+ State(app),
+ issued_at.clone(),
+ issuer.clone(),
+ Json(post::Request {}),
+ )
+ .await
+ .expect("creating an invite always succeeds");
+
+ // Verify the response
+ assert_eq!(issuer.login.id, invite.issuer);
+ assert_eq!(&*issued_at, &invite.issued_at);
+}
diff --git a/src/login/password.rs b/src/login/password.rs
index c27c950..e1d164e 100644
--- a/src/login/password.rs
+++ b/src/login/password.rs
@@ -31,7 +31,7 @@ impl fmt::Debug for StoredHash {
}
}
-#[derive(serde::Deserialize)]
+#[derive(Clone, serde::Deserialize)]
#[serde(transparent)]
pub struct Password(nfc::String);
diff --git a/src/message/routes/message/mod.rs b/src/message/routes/message/mod.rs
index 545ad26..45a7e9d 100644
--- a/src/message/routes/message/mod.rs
+++ b/src/message/routes/message/mod.rs
@@ -3,9 +3,9 @@ mod test;
pub mod delete {
use axum::{
- extract::{Path, State},
+ extract::{Json, Path, State},
http::StatusCode,
- response::{IntoResponse, Response},
+ response::{self, IntoResponse},
};
use crate::{
@@ -21,10 +21,21 @@ pub mod delete {
Path(message): Path<message::Id>,
RequestedAt(deleted_at): RequestedAt,
_: Identity,
- ) -> Result<StatusCode, Error> {
+ ) -> Result<Response, Error> {
app.messages().delete(&message, &deleted_at).await?;
- Ok(StatusCode::ACCEPTED)
+ Ok(Response { id: message })
+ }
+
+ #[derive(Debug, serde::Serialize)]
+ pub struct Response {
+ pub id: message::Id,
+ }
+
+ impl IntoResponse for Response {
+ fn into_response(self) -> response::Response {
+ (StatusCode::ACCEPTED, Json(self)).into_response()
+ }
}
#[derive(Debug, thiserror::Error)]
@@ -32,7 +43,7 @@ pub mod delete {
pub struct Error(#[from] pub DeleteError);
impl IntoResponse for Error {
- fn into_response(self) -> Response {
+ fn into_response(self) -> response::Response {
let Self(error) = self;
#[allow(clippy::match_wildcard_for_single_variants)]
match error {
diff --git a/src/message/routes/message/test.rs b/src/message/routes/message/test.rs
index 2016fb8..ae89506 100644
--- a/src/message/routes/message/test.rs
+++ b/src/message/routes/message/test.rs
@@ -1,7 +1,4 @@
-use axum::{
- extract::{Path, State},
- http::StatusCode,
-};
+use axum::extract::{Path, State};
use super::delete;
use crate::{message::app, test::fixtures};
@@ -29,7 +26,7 @@ pub async fn delete_message() {
// Verify the response
- assert_eq!(response, StatusCode::ACCEPTED);
+ assert_eq!(message.id, response.id);
// Verify the semantics
diff --git a/src/setup/routes/mod.rs b/src/setup/routes/mod.rs
index e1e1711..6054983 100644
--- a/src/setup/routes/mod.rs
+++ b/src/setup/routes/mod.rs
@@ -3,6 +3,8 @@ use axum::{routing::post, Router};
use crate::app::App;
mod post;
+#[cfg(test)]
+mod test;
pub fn router() -> Router<App> {
Router::new().route("/api/setup", post(post::handler))
diff --git a/src/setup/routes/test.rs b/src/setup/routes/test.rs
new file mode 100644
index 0000000..f7562ae
--- /dev/null
+++ b/src/setup/routes/test.rs
@@ -0,0 +1,69 @@
+use axum::extract::{Json, State};
+
+use super::post;
+use crate::{setup::app, test::fixtures};
+
+#[tokio::test]
+async fn fresh_instance() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+
+ // Call the endpoint
+ let identity = fixtures::cookie::not_logged_in();
+ let (name, password) = fixtures::login::propose();
+ let request = post::Request {
+ name: name.clone(),
+ password: password.clone(),
+ };
+ let (identity, Json(response)) =
+ post::handler(State(app.clone()), fixtures::now(), identity, Json(request))
+ .await
+ .expect("setup in a fresh app succeeds");
+
+ // Verify the response
+
+ assert_eq!(name, response.name);
+
+ // Verify that the issued token is valid
+
+ let secret = identity
+ .secret()
+ .expect("newly-issued identity has a token secret");
+ let (_, login) = app
+ .tokens()
+ .validate(&secret, &fixtures::now())
+ .await
+ .expect("newly-issued identity cookie is valid");
+ assert_eq!(response, login);
+
+ // Verify that the given credentials can log in
+
+ let (login, _) = app
+ .tokens()
+ .login(&name, &password, &fixtures::now())
+ .await
+ .expect("credentials given on signup are valid");
+ assert_eq!(response, login);
+}
+
+#[tokio::test]
+async fn login_exists() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ fixtures::login::create(&app, &fixtures::now()).await;
+
+ // Call the endpoint
+ let identity = fixtures::cookie::not_logged_in();
+ let (name, password) = fixtures::login::propose();
+ let request = post::Request { name, password };
+ let post::Error(error) =
+ post::handler(State(app.clone()), fixtures::now(), identity, Json(request))
+ .await
+ .expect_err("setup in a populated app fails");
+
+ // Verify the response
+
+ assert!(matches!(error, app::Error::SetupCompleted));
+}
diff --git a/src/test/fixtures/channel.rs b/src/test/fixtures/channel.rs
index 8cb38ae..0c6480b 100644
--- a/src/test/fixtures/channel.rs
+++ b/src/test/fixtures/channel.rs
@@ -1,5 +1,3 @@
-use std::future;
-
use faker_rand::{
en_us::{addresses::CityName, names::FullName},
faker_impl_from_templates,
@@ -10,7 +8,6 @@ use crate::{
app::App,
channel::{self, Channel},
clock::RequestedAt,
- event::Event,
name::Name,
};
@@ -31,20 +28,6 @@ faker_impl_from_templates! {
NameTemplate; "{} {}", CityName, FullName;
}
-pub fn events(event: Event) -> future::Ready<Option<channel::Event>> {
- future::ready(match event {
- Event::Channel(channel) => Some(channel),
- _ => None,
- })
-}
-
-pub fn created(event: channel::Event) -> future::Ready<Option<channel::event::Created>> {
- future::ready(match event {
- channel::Event::Created(event) => Some(event),
- channel::Event::Deleted(_) => None,
- })
-}
-
pub fn fictitious() -> channel::Id {
channel::Id::generate()
}
diff --git a/src/test/fixtures/event.rs b/src/test/fixtures/event.rs
index fa4fbc0..de02d4d 100644
--- a/src/test/fixtures/event.rs
+++ b/src/test/fixtures/event.rs
@@ -1,8 +1,79 @@
-use crate::message::{Event, Message};
+use std::future::{self, Ready};
-pub fn message_sent(event: &Event, message: &Message) -> bool {
- matches!(
- &event,
- Event::Sent(event) if message == &event.into()
- )
+use crate::event::Event;
+
+pub fn channel(event: Event) -> Ready<Option<channel::Event>> {
+ future::ready(match event {
+ Event::Channel(channel) => Some(channel),
+ _ => None,
+ })
+}
+
+pub fn message(event: Event) -> Ready<Option<message::Event>> {
+ future::ready(match event {
+ Event::Message(event) => Some(event),
+ _ => None,
+ })
+}
+
+pub fn login(event: Event) -> Ready<Option<login::Event>> {
+ future::ready(match event {
+ Event::Login(event) => Some(event),
+ _ => None,
+ })
+}
+
+pub mod channel {
+ use std::future::{self, Ready};
+
+ use crate::channel::event;
+ pub use crate::channel::Event;
+
+ pub fn created(event: Event) -> Ready<Option<event::Created>> {
+ future::ready(match event {
+ Event::Created(event) => Some(event),
+ Event::Deleted(_) => None,
+ })
+ }
+
+ pub fn deleted(event: Event) -> Ready<Option<event::Deleted>> {
+ future::ready(match event {
+ Event::Deleted(event) => Some(event),
+ Event::Created(_) => None,
+ })
+ }
+}
+
+pub mod message {
+ use std::future::{self, Ready};
+
+ use crate::message::event;
+ pub use crate::message::Event;
+
+ pub fn sent(event: Event) -> Ready<Option<event::Sent>> {
+ future::ready(match event {
+ Event::Sent(event) => Some(event),
+ Event::Deleted(_) => None,
+ })
+ }
+
+ pub fn deleted(event: Event) -> future::Ready<Option<event::Deleted>> {
+ future::ready(match event {
+ Event::Deleted(event) => Some(event),
+ Event::Sent(_) => None,
+ })
+ }
+}
+
+pub mod login {
+ use std::future::{self, Ready};
+
+ use crate::login::event;
+ pub use crate::login::Event;
+
+ pub fn created(event: Event) -> Ready<Option<event::Created>> {
+ future::ready(match event {
+ Event::Created(event) => Some(event),
+ })
+ }
}
diff --git a/src/test/fixtures/future.rs b/src/test/fixtures/future.rs
index bbdc9f8..2f810a3 100644
--- a/src/test/fixtures/future.rs
+++ b/src/test/fixtures/future.rs
@@ -1,55 +1,221 @@
-use std::{future::IntoFuture, time::Duration};
+use std::{future::Future, pin::Pin, task};
-use futures::{stream, Stream};
-use tokio::time::timeout;
+use futures::stream;
-async fn immediately<F>(fut: F) -> F::Output
+// Combinators for futures that prevent waits, even when the underlying future
+// would block.
+//
+// These are only useful for futures with no bound on how long they may wait,
+// and this trait is only implemented on futures that are likely to have that
+// characteristic. Trying to apply this to futures that already have some
+// bounded wait time may make tests fail inappropriately and can hide other
+// logic errors.
+pub trait Expect: Sized {
+ // The returned future expects the underlying future to be ready immediately,
+ // and panics with the provided message if it is not.
+ //
+ // For stream operations, can be used to assert immediate completion.
+ fn expect_ready(self, message: &str) -> Ready<Self>
+ where
+ Self: Future;
+
+ // The returned future expects the underlying future _not_ to be ready, and
+ // panics if it is. This is usually a useful proxy for "I expect this to never
+ // arrive" or "to not be here yet." The future is transformed to return `()`,
+ // since the underlying future can never provide a value.
+ //
+ // For stream operations, can be used to assert that completion hasn't happened
+ // yet.
+ fn expect_wait(self, message: &str) -> Wait<Self>
+ where
+ Self: Future;
+
+ // The returned future expects the underlying future to resolve immediately, to
+ // a `Some` value. If it resolves to `None` or is not ready, it panics. The
+ // future is transformed to return the inner value from the `Some` case, like
+ // [`Option::expect`].
+ //
+ // For stream operations, can be used to assert that the stream has at least one
+ // message.
+ fn expect_some<T>(self, message: &str) -> Some<Self>
+ where
+ Self: Future<Output = Option<T>>;
+
+ // The returned future expects the underlying future to resolve immediately, to
+ // a `None` value. If it resolves to `Some(_)`, or is not ready, it panics. The
+ // future is transformed to return `()`, since the underlying future's value is
+ // fixed.
+ //
+ // For stream operations, can be used to assert that the stream has ended.
+ fn expect_none<T>(self, message: &str) -> None<Self>
+ where
+ Self: Future<Output = Option<T>>;
+}
+
+impl<'a, St> Expect for stream::Next<'a, St> {
+ fn expect_ready(self, message: &str) -> Ready<Self> {
+ Ready {
+ future: self,
+ message,
+ }
+ }
+
+ fn expect_wait(self, message: &str) -> Wait<Self> {
+ Wait {
+ future: self,
+ message,
+ }
+ }
+
+ fn expect_some<T>(self, message: &str) -> Some<Self>
+ where
+ Self: Future<Output = Option<T>>,
+ {
+ Some {
+ future: self,
+ message,
+ }
+ }
+
+ fn expect_none<T>(self, message: &str) -> None<Self>
+ where
+ Self: Future<Output = Option<T>>,
+ {
+ None {
+ future: self,
+ message,
+ }
+ }
+}
+
+impl<St, C> Expect for stream::Collect<St, C> {
+ fn expect_ready(self, message: &str) -> Ready<Self> {
+ Ready {
+ future: self,
+ message,
+ }
+ }
+
+ fn expect_wait(self, message: &str) -> Wait<Self> {
+ Wait {
+ future: self,
+ message,
+ }
+ }
+
+ fn expect_some<T>(self, message: &str) -> Some<Self>
+ where
+ Self: Future<Output = Option<T>>,
+ {
+ Some {
+ future: self,
+ message,
+ }
+ }
+
+ fn expect_none<T>(self, message: &str) -> None<Self>
+ where
+ Self: Future<Output = Option<T>>,
+ {
+ None {
+ future: self,
+ message,
+ }
+ }
+}
+
+#[pin_project::pin_project]
+pub struct Ready<'m, F> {
+ #[pin]
+ future: F,
+ message: &'m str,
+}
+
+impl<'m, F> Future for Ready<'m, F>
where
- F: IntoFuture,
+ F: Future + std::fmt::Debug,
{
- // I haven't been particularly rigorous here. Zero delay _seems to work_,
- // but this can be set higher; it makes tests that fail to meet the
- // "immediate" expectation take longer, but gives slow tests time to
- // succeed, as well.
- let duration = Duration::from_nanos(0);
- timeout(duration, fut)
- .await
- .expect("expected result immediately")
-}
-
-// This is only intended for streams, since their `next()`, `collect()`, and
-// so on can all block indefinitely on an empty stream. There's no need to
-// force immediacy on futures that "can't" block forever, and it can hide logic
-// errors if you do that.
-//
-// The impls below _could_ be replaced with a blanket impl for all future
-// types, otherwise. The choice to restrict impls to stream futures is
-// deliberate.
-pub trait Immediately {
- type Output;
+ type Output = F::Output;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
+ let this = self.project();
+
+ if let task::Poll::Ready(value) = this.future.poll(cx) {
+ task::Poll::Ready(value)
+ } else {
+ panic!("{}", this.message);
+ }
+ }
+}
+
+#[pin_project::pin_project]
+pub struct Wait<'m, F> {
+ #[pin]
+ future: F,
+ message: &'m str,
+}
- async fn immediately(self) -> Self::Output;
+impl<'m, F> Future for Wait<'m, F>
+where
+ F: Future + std::fmt::Debug,
+{
+ type Output = ();
+
+ fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
+ let this = self.project();
+
+ if this.future.poll(cx).is_pending() {
+ task::Poll::Ready(())
+ } else {
+ panic!("{}", this.message);
+ }
+ }
}
-impl<'a, St> Immediately for stream::Next<'a, St>
+#[pin_project::pin_project]
+pub struct Some<'m, F> {
+ #[pin]
+ future: F,
+ message: &'m str,
+}
+
+impl<'m, F, T> Future for Some<'m, F>
where
- St: Stream + Unpin + ?Sized,
+ F: Future<Output = Option<T>> + std::fmt::Debug,
{
- type Output = Option<<St as Stream>::Item>;
+ type Output = T;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
+ let this = self.project();
- async fn immediately(self) -> Self::Output {
- immediately(self).await
+ if let task::Poll::Ready(Option::Some(value)) = this.future.poll(cx) {
+ task::Poll::Ready(value)
+ } else {
+ panic!("{}", this.message)
+ }
}
}
-impl<St, C> Immediately for stream::Collect<St, C>
+#[pin_project::pin_project]
+pub struct None<'m, F> {
+ #[pin]
+ future: F,
+ message: &'m str,
+}
+
+impl<'m, F, T> Future for None<'m, F>
where
- St: Stream,
- C: Default + Extend<<St as Stream>::Item>,
+ F: Future<Output = Option<T>> + std::fmt::Debug,
{
- type Output = C;
+ type Output = ();
+
+ fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
+ let this = self.project();
- async fn immediately(self) -> Self::Output {
- immediately(self).await
+ if let task::Poll::Ready(Option::None) = this.future.poll(cx) {
+ task::Poll::Ready(())
+ } else {
+ panic!("{}", this.message)
+ }
}
}
diff --git a/src/test/fixtures/invite.rs b/src/test/fixtures/invite.rs
new file mode 100644
index 0000000..654d1b4
--- /dev/null
+++ b/src/test/fixtures/invite.rs
@@ -0,0 +1,17 @@
+use crate::{
+ app::App,
+ clock::DateTime,
+ invite::{self, Invite},
+ login::Login,
+};
+
+pub async fn issue(app: &App, issuer: &Login, issued_at: &DateTime) -> Invite {
+ app.invites()
+ .issue(issuer, issued_at)
+ .await
+ .expect("issuing invites never fails")
+}
+
+pub fn fictitious() -> invite::Id {
+ invite::Id::generate()
+}
diff --git a/src/test/fixtures/message.rs b/src/test/fixtures/message.rs
index 3aebdd9..d3b4719 100644
--- a/src/test/fixtures/message.rs
+++ b/src/test/fixtures/message.rs
@@ -1,12 +1,9 @@
-use std::future;
-
use faker_rand::lorem::Paragraphs;
use crate::{
app::App,
channel::Channel,
clock::RequestedAt,
- event::Event,
login::Login,
message::{self, Body, Message},
};
@@ -24,13 +21,6 @@ pub fn propose() -> Body {
rand::random::<Paragraphs>().to_string().into()
}
-pub fn events(event: Event) -> future::Ready<Option<message::Event>> {
- future::ready(match event {
- Event::Message(event) => Some(event),
- _ => None,
- })
-}
-
pub fn fictitious() -> message::Id {
message::Id::generate()
}
diff --git a/src/test/fixtures/mod.rs b/src/test/fixtures/mod.rs
index 9111811..2b7b6af 100644
--- a/src/test/fixtures/mod.rs
+++ b/src/test/fixtures/mod.rs
@@ -7,6 +7,7 @@ pub mod cookie;
pub mod event;
pub mod future;
pub mod identity;
+pub mod invite;
pub mod login;
pub mod message;
diff --git a/src/ui/mime.rs b/src/ui/mime.rs
index 9c724f0..7818ac1 100644
--- a/src/ui/mime.rs
+++ b/src/ui/mime.rs
@@ -1,7 +1,10 @@
use mime::Mime;
use unix_path::Path;
-// Extremely manual; using `std::path` here would result in platform-dependent behaviour when it's not appropriate (the URLs passed here always use `/` and are parsed like URLs). Using `unix_path` might be an option, but it's not clearly
+// Extremely manual; using `std::path` here would result in platform-dependent
+// behaviour when it's not appropriate (the URLs passed here always use `/` and
+// are parsed like URLs). Using `unix_path` might be an option, but it's not
+// clearly
pub fn from_path<P>(path: P) -> Result<Mime, mime::FromStrError>
where
P: AsRef<Path>,
diff --git a/src/ui/routes/ch/channel.rs b/src/ui/routes/ch/channel.rs
index a338f1f..a854f14 100644
--- a/src/ui/routes/ch/channel.rs
+++ b/src/ui/routes/ch/channel.rs
@@ -6,7 +6,7 @@ pub mod get {
use crate::{
app::App,
- channel,
+ channel::{self, app},
error::Internal,
token::extract::Identity,
ui::{
@@ -21,18 +21,14 @@ pub mod get {
Path(channel): Path<channel::Id>,
) -> Result<Asset, Error> {
let _ = identity.ok_or(Error::NotLoggedIn)?;
- app.channels()
- .get(&channel)
- .await
- .map_err(Error::internal)?
- .ok_or(Error::NotFound)?;
+ app.channels().get(&channel).await.map_err(Error::from)?;
Assets::index().map_err(Error::Internal)
}
#[derive(Debug, thiserror::Error)]
pub enum Error {
- #[error("requested channel not found")]
+ #[error("channel not found")]
NotFound,
#[error("not logged in")]
NotLoggedIn,
@@ -40,9 +36,12 @@ pub mod get {
Internal(Internal),
}
- impl Error {
- fn internal(err: impl Into<Internal>) -> Self {
- Self::Internal(err.into())
+ impl From<app::Error> for Error {
+ fn from(error: app::Error) -> Self {
+ match error {
+ app::Error::NotFound(_) | app::Error::Deleted(_) => Self::NotFound,
+ other => Self::Internal(other.into()),
+ }
}
}