diff options
| author | Kit La Touche <kit@transneptune.net> | 2024-10-25 22:16:03 -0400 |
|---|---|---|
| committer | Kit La Touche <kit@transneptune.net> | 2024-10-25 22:16:03 -0400 |
| commit | a50911a03c8955e08c77b0f3764dbda963013971 (patch) | |
| tree | 9f5319191438b85b860ba06c9a203d3f129072a1 /src | |
| parent | 4c49283553f4b18bb2a74de280b340a073e3253e (diff) | |
| parent | c87b5c53077c02bf21234e24bf976aa7a5f2bac8 (diff) | |
Merge branch 'main' into wip/mobile
Diffstat (limited to 'src')
38 files changed, 1805 insertions, 610 deletions
@@ -44,7 +44,7 @@ impl App { } pub const fn invites(&self) -> Invites { - Invites::new(&self.db) + Invites::new(&self.db, &self.events) } #[cfg(not(test))] diff --git a/src/channel/app.rs b/src/channel/app.rs index 7bfa0f7..8359277 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -4,7 +4,7 @@ use sqlx::sqlite::SqlitePool; use super::{ repo::{LoadError, Provider as _}, - Channel, History, Id, + Channel, Id, }; use crate::{ clock::DateTime, @@ -42,12 +42,14 @@ impl<'a> Channels<'a> { // This function is careless with respect to time, and gets you the channel as // it exists in the specific moment when you call it. - pub async fn get(&self, channel: &Id) -> Result<Option<Channel>, Error> { + pub async fn get(&self, channel: &Id) -> Result<Channel, Error> { + let not_found = || Error::NotFound(channel.clone()); + let mut tx = self.db.begin().await?; - let channel = tx.channels().by_id(channel).await.optional()?; + let channel = tx.channels().by_id(channel).await.not_found(not_found)?; tx.commit().await?; - Ok(channel.as_ref().and_then(History::as_snapshot)) + channel.as_snapshot().ok_or_else(not_found) } pub async fn delete(&self, channel: &Id, deleted_at: &DateTime) -> Result<(), Error> { diff --git a/src/channel/routes/channel/delete.rs b/src/channel/routes/channel/delete.rs index 91eb506..2d2b5f1 100644 --- a/src/channel/routes/channel/delete.rs +++ b/src/channel/routes/channel/delete.rs @@ -1,12 +1,12 @@ use axum::{ - extract::{Path, State}, + extract::{Json, Path, State}, http::StatusCode, - response::{IntoResponse, Response}, + response::{self, IntoResponse}, }; use crate::{ app::App, - channel::app, + channel::{self, app}, clock::RequestedAt, error::{Internal, NotFound}, token::extract::Identity, @@ -17,10 +17,21 @@ pub async fn handler( Path(channel): Path<super::PathInfo>, RequestedAt(deleted_at): RequestedAt, _: Identity, -) -> Result<StatusCode, Error> { +) -> Result<Response, Error> { app.channels().delete(&channel, &deleted_at).await?; - Ok(StatusCode::ACCEPTED) + Ok(Response { id: channel }) +} + +#[derive(Debug, serde::Serialize)] +pub struct Response { + pub id: channel::Id, +} + +impl IntoResponse for Response { + fn into_response(self) -> response::Response { + (StatusCode::ACCEPTED, Json(self)).into_response() + } } #[derive(Debug, thiserror::Error)] @@ -28,7 +39,7 @@ pub async fn handler( pub struct Error(#[from] pub app::Error); impl IntoResponse for Error { - fn into_response(self) -> Response { + fn into_response(self) -> response::Response { let Self(error) = self; #[allow(clippy::match_wildcard_for_single_variants)] match error { diff --git a/src/channel/routes/channel/test/delete.rs b/src/channel/routes/channel/test/delete.rs index e9af12f..0371b0a 100644 --- a/src/channel/routes/channel/test/delete.rs +++ b/src/channel/routes/channel/test/delete.rs @@ -1,7 +1,4 @@ -use axum::{ - extract::{Path, State}, - http::StatusCode, -}; +use axum::extract::{Path, State}; use crate::{ channel::{app, routes::channel::delete}, @@ -9,7 +6,7 @@ use crate::{ }; #[tokio::test] -pub async fn delete_channel() { +pub async fn valid_channel() { // Set up the environment let app = fixtures::scratch_app().await; @@ -29,7 +26,7 @@ pub async fn delete_channel() { // Verify the response - assert_eq!(response, StatusCode::ACCEPTED); + assert_eq!(channel.id, response.id); // Verify the semantics @@ -38,7 +35,7 @@ pub async fn delete_channel() { } #[tokio::test] -pub async fn delete_invalid_channel_id() { +pub async fn invalid_channel_id() { // Set up the environment let app = fixtures::scratch_app().await; @@ -62,7 +59,7 @@ pub async fn delete_invalid_channel_id() { } #[tokio::test] -pub async fn delete_deleted() { +pub async fn channel_deleted() { // Set up the environment let app = fixtures::scratch_app().await; @@ -91,7 +88,7 @@ pub async fn delete_deleted() { } #[tokio::test] -pub async fn delete_expired() { +pub async fn channel_expired() { // Set up the environment let app = fixtures::scratch_app().await; @@ -120,7 +117,7 @@ pub async fn delete_expired() { } #[tokio::test] -pub async fn delete_purged() { +pub async fn channel_purged() { // Set up the environment let app = fixtures::scratch_app().await; diff --git a/src/channel/routes/channel/test/post.rs b/src/channel/routes/channel/test/post.rs index 67e7d36..111a703 100644 --- a/src/channel/routes/channel/test/post.rs +++ b/src/channel/routes/channel/test/post.rs @@ -1,11 +1,11 @@ use axum::extract::{Json, Path, State}; -use futures::stream::StreamExt; +use futures::stream::{self, StreamExt as _}; use crate::{ channel::{self, routes::channel::post}, event::Sequenced, - message::{self, app::SendError}, - test::fixtures::{self, future::Immediately as _}, + message::app::SendError, + test::fixtures::{self, future::Expect as _}, }; #[tokio::test] @@ -39,24 +39,23 @@ async fn messages_in_order() { // Verify the semantics - let events = app + let mut events = app .events() .subscribe(None) .await .expect("subscribing to a valid channel succeeds") - .filter_map(fixtures::message::events) - .take(requests.len()); + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::sent) + .zip(stream::iter(requests)); - let events = events.collect::<Vec<_>>().immediately().await; - - for ((sent_at, message), event) in requests.into_iter().zip(events) { + while let Some((event, (sent_at, body))) = events + .next() + .expect_ready("an event should be ready for each message") + .await + { assert_eq!(*sent_at, event.at()); - assert!(matches!( - event, - message::Event::Sent(event) - if event.message.sender == sender.login.id - && event.message.body == message - )); + assert_eq!(sender.login.id, event.message.sender); + assert_eq!(body, event.message.body); } } diff --git a/src/channel/routes/test.rs b/src/channel/routes/test.rs index 216eba1..10b1e8d 100644 --- a/src/channel/routes/test.rs +++ b/src/channel/routes/test.rs @@ -7,7 +7,7 @@ use super::post; use crate::{ channel::app, name::Name, - test::fixtures::{self, future::Immediately as _}, + test::fixtures::{self, future::Expect as _}, }; #[tokio::test] @@ -39,7 +39,6 @@ async fn new_channel() { .channels() .get(&response.id) .await - .expect("searching for channels by ID never fails") .expect("the newly-created channel exists"); assert_eq!(response, channel); @@ -48,15 +47,11 @@ async fn new_channel() { .subscribe(None) .await .expect("subscribing never fails") - .filter_map(fixtures::channel::events) - .filter_map(fixtures::channel::created) + .filter_map(fixtures::event::channel) + .filter_map(fixtures::event::channel::created) .filter(|event| future::ready(event.channel == response)); - let event = events - .next() - .immediately() - .await - .expect("creation event published"); + let event = events.next().expect_some("creation event published").await; assert_eq!(event.channel, response); } @@ -165,7 +160,6 @@ async fn name_reusable_after_delete() { .channels() .get(&response.id) .await - .expect("searching for channels by ID never fails") .expect("the newly-created channel exists"); assert_eq!(response, channel); } @@ -215,7 +209,6 @@ async fn name_reusable_after_expiry() { .channels() .get(&response.id) .await - .expect("searching for channels by ID never fails") .expect("the newly-created channel exists"); assert_eq!(response, channel); } diff --git a/src/clock.rs b/src/clock.rs index 9ffef82..242bcdf 100644 --- a/src/clock.rs +++ b/src/clock.rs @@ -12,7 +12,7 @@ pub type DateTime = chrono::DateTime<chrono::Utc>; // calculated once per request, even if the extractor is used in multiple // places. This requires the [middleware] function to be installed with // [axum::middleware::from_fn] around the current route. -#[derive(Clone)] +#[derive(Debug, Clone)] pub struct RequestedAt(pub DateTime); impl RequestedAt { diff --git a/src/event/routes/test.rs b/src/event/routes/test.rs deleted file mode 100644 index 49f8094..0000000 --- a/src/event/routes/test.rs +++ /dev/null @@ -1,459 +0,0 @@ -use axum::extract::State; -use axum_extra::extract::Query; -use futures::{ - future, - stream::{self, StreamExt as _}, -}; - -use super::get; -use crate::{ - event::Sequenced as _, - test::fixtures::{self, future::Immediately as _}, -}; - -#[tokio::test] -async fn includes_historical_message() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let sender = fixtures::login::create(&app, &fixtures::now()).await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; - let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; - - // Call the endpoint - - let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = get::handler(State(app), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); - - // Verify the structure of the response. - - let event = events - .filter_map(fixtures::message::events) - .next() - .immediately() - .await - .expect("delivered stored message"); - - assert!(fixtures::event::message_sent(&event, &message)); -} - -#[tokio::test] -async fn includes_live_message() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; - - // Call the endpoint - - let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); - - // Verify the semantics - - let sender = fixtures::login::create(&app, &fixtures::now()).await; - let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; - - let event = events - .filter_map(fixtures::message::events) - .next() - .immediately() - .await - .expect("delivered live message"); - - assert!(fixtures::event::message_sent(&event, &message)); -} - -#[tokio::test] -async fn includes_multiple_channels() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let sender = fixtures::login::create(&app, &fixtures::now()).await; - - let channels = [ - fixtures::channel::create(&app, &fixtures::now()).await, - fixtures::channel::create(&app, &fixtures::now()).await, - ]; - - let messages = stream::iter(channels) - .then(|channel| { - let app = app.clone(); - let sender = sender.clone(); - let channel = channel.clone(); - async move { fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await } - }) - .collect::<Vec<_>>() - .await; - - // Call the endpoint - - let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = get::handler(State(app), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); - - // Verify the structure of the response. - - let events = events - .filter_map(fixtures::message::events) - .take(messages.len()) - .collect::<Vec<_>>() - .immediately() - .await; - - for message in &messages { - assert!(events - .iter() - .any(|event| fixtures::event::message_sent(event, message))); - } -} - -#[tokio::test] -async fn sequential_messages() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; - let sender = fixtures::login::create(&app, &fixtures::now()).await; - - let messages = vec![ - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, - ]; - - // Call the endpoint - - let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = get::handler(State(app), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); - - // Verify the structure of the response. - - let mut events = events - .filter_map(fixtures::message::events) - .filter(|event| { - future::ready( - messages - .iter() - .any(|message| fixtures::event::message_sent(event, message)), - ) - }); - - // Verify delivery in order - for message in &messages { - let event = events - .next() - .immediately() - .await - .expect("undelivered messages remaining"); - - assert!(fixtures::event::message_sent(&event, message)); - } -} - -#[tokio::test] -async fn resumes_from() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; - let sender = fixtures::login::create(&app, &fixtures::now()).await; - - let initial_message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; - - let later_messages = vec![ - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, - ]; - - // Call the endpoint - - let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - - let resume_at = { - // First subscription - let get::Response(events) = get::handler( - State(app.clone()), - subscriber.clone(), - None, - Query::default(), - ) - .await - .expect("subscribe never fails"); - - let event = events - .filter_map(fixtures::message::events) - .next() - .immediately() - .await - .expect("delivered events"); - - assert!(fixtures::event::message_sent(&event, &initial_message)); - - event.sequence() - }; - - // Resume after disconnect - let get::Response(resumed) = get::handler( - State(app), - subscriber, - Some(resume_at.into()), - Query::default(), - ) - .await - .expect("subscribe never fails"); - - // Verify the structure of the response. - - let events = resumed - .filter_map(fixtures::message::events) - .take(later_messages.len()) - .collect::<Vec<_>>() - .immediately() - .await; - - for message in &later_messages { - assert!(events - .iter() - .any(|event| fixtures::event::message_sent(event, message))); - } -} - -// This test verifies a real bug I hit developing the vector-of-sequences -// approach to resuming events. A small omission caused the event IDs in a -// resumed stream to _omit_ channels that were in the original stream until -// those channels also appeared in the resumed stream. -// -// Clients would see something like -// * In the original stream, Cfoo=5,Cbar=8 -// * In the resumed stream, Cfoo=6 (no Cbar sequence number) -// -// Disconnecting and reconnecting a second time, using event IDs from that -// initial period of the first resume attempt, would then cause the second -// resume attempt to restart all other channels from the beginning, and not -// from where the first disconnection happened. -// -// This is a real and valid behaviour for clients! -#[tokio::test] -async fn serial_resume() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let sender = fixtures::login::create(&app, &fixtures::now()).await; - let channel_a = fixtures::channel::create(&app, &fixtures::now()).await; - let channel_b = fixtures::channel::create(&app, &fixtures::now()).await; - - // Call the endpoint - - let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - - let resume_at = { - let initial_messages = [ - fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await, - fixtures::message::send(&app, &channel_b, &sender, &fixtures::now()).await, - ]; - - // First subscription - let get::Response(events) = get::handler( - State(app.clone()), - subscriber.clone(), - None, - Query::default(), - ) - .await - .expect("subscribe never fails"); - - let events = events - .filter_map(fixtures::message::events) - .take(initial_messages.len()) - .collect::<Vec<_>>() - .immediately() - .await; - - for message in &initial_messages { - assert!(events - .iter() - .any(|event| fixtures::event::message_sent(event, message))); - } - - let event = events.last().expect("this vec is non-empty"); - - event.sequence() - }; - - // Resume after disconnect - let resume_at = { - let resume_messages = [ - // Note that channel_b does not appear here. The buggy behaviour - // would be masked if channel_b happened to send a new message - // into the resumed event stream. - fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await, - fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await, - ]; - - // Second subscription - let get::Response(events) = get::handler( - State(app.clone()), - subscriber.clone(), - Some(resume_at.into()), - Query::default(), - ) - .await - .expect("subscribe never fails"); - - let events = events - .filter_map(fixtures::message::events) - .take(resume_messages.len()) - .collect::<Vec<_>>() - .immediately() - .await; - - for message in &resume_messages { - assert!(events - .iter() - .any(|event| fixtures::event::message_sent(event, message))); - } - - let event = events.last().expect("this vec is non-empty"); - - event.sequence() - }; - - // Resume after disconnect a second time - { - // At this point, we can send on either channel and demonstrate the - // problem. The resume point should before both of these messages, but - // after _all_ prior messages. - let final_messages = [ - fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await, - fixtures::message::send(&app, &channel_b, &sender, &fixtures::now()).await, - ]; - - // Third subscription - let get::Response(events) = get::handler( - State(app.clone()), - subscriber.clone(), - Some(resume_at.into()), - Query::default(), - ) - .await - .expect("subscribe never fails"); - - let events = events - .filter_map(fixtures::message::events) - .take(final_messages.len()) - .collect::<Vec<_>>() - .immediately() - .await; - - // This set of messages, in particular, _should not_ include any prior - // messages from `initial_messages` or `resume_messages`. - for message in &final_messages { - assert!(events - .iter() - .any(|event| fixtures::event::message_sent(event, message))); - } - }; -} - -#[tokio::test] -async fn terminates_on_token_expiry() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; - let sender = fixtures::login::create(&app, &fixtures::now()).await; - - // Subscribe via the endpoint - - let subscriber_creds = fixtures::login::create_with_password(&app, &fixtures::now()).await; - let subscriber = - fixtures::identity::logged_in(&app, &subscriber_creds, &fixtures::ancient()).await; - - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); - - // Verify the resulting stream's behaviour - - app.tokens() - .expire(&fixtures::now()) - .await - .expect("expiring tokens succeeds"); - - // These should not be delivered. - let messages = [ - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, - ]; - - assert!(events - .filter_map(fixtures::message::events) - .filter(|event| future::ready( - messages - .iter() - .any(|message| fixtures::event::message_sent(event, message)) - )) - .next() - .immediately() - .await - .is_none()); -} - -#[tokio::test] -async fn terminates_on_logout() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; - let sender = fixtures::login::create(&app, &fixtures::now()).await; - - // Subscribe via the endpoint - - let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - - let get::Response(events) = get::handler( - State(app.clone()), - subscriber.clone(), - None, - Query::default(), - ) - .await - .expect("subscribe never fails"); - - // Verify the resulting stream's behaviour - - app.tokens() - .logout(&subscriber.token) - .await - .expect("expiring tokens succeeds"); - - // These should not be delivered. - let messages = [ - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, - ]; - - assert!(events - .filter_map(fixtures::message::events) - .filter(|event| future::ready( - messages - .iter() - .any(|message| fixtures::event::message_sent(event, message)) - )) - .next() - .immediately() - .await - .is_none()); -} diff --git a/src/event/routes/test/channel.rs b/src/event/routes/test/channel.rs new file mode 100644 index 0000000..6a0a803 --- /dev/null +++ b/src/event/routes/test/channel.rs @@ -0,0 +1,241 @@ +use axum::extract::State; +use axum_extra::extract::Query; +use futures::{future, stream::StreamExt as _}; + +use crate::{ + event::routes::get, + test::fixtures::{self, future::Expect as _}, +}; + +#[tokio::test] +async fn creating() { + // Set up the environment + + let app = fixtures::scratch_app().await; + + // Subscribe + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let get::Response(events) = + get::handler(State(app.clone()), subscriber, None, Query::default()) + .await + .expect("subscribe never fails"); + + // Create a channel + + let name = fixtures::channel::propose(); + let channel = app + .channels() + .create(&name, &fixtures::now()) + .await + .expect("creating a channel succeeds"); + + // Verify channel created event + + events + .filter_map(fixtures::event::channel) + .filter_map(fixtures::event::channel::created) + .filter(|event| future::ready(event.channel == channel)) + .next() + .expect_some("channel created event is delivered") + .await; +} + +#[tokio::test] +async fn previously_created() { + // Set up the environment + + let app = fixtures::scratch_app().await; + + // Create a channel + + let name = fixtures::channel::propose(); + let channel = app + .channels() + .create(&name, &fixtures::now()) + .await + .expect("creating a channel succeeds"); + + // Subscribe + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let get::Response(events) = + get::handler(State(app.clone()), subscriber, None, Query::default()) + .await + .expect("subscribe never fails"); + + // Verify channel created event + + let _ = events + .filter_map(fixtures::event::channel) + .filter_map(fixtures::event::channel::created) + .filter(|event| future::ready(event.channel == channel)) + .next() + .expect_some("channel created event is delivered") + .await; +} + +#[tokio::test] +async fn expiring() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; + + // Subscribe + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let get::Response(events) = + get::handler(State(app.clone()), subscriber, None, Query::default()) + .await + .expect("subscribe never fails"); + + // Expire channels + + app.channels() + .expire(&fixtures::now()) + .await + .expect("expiring channels always succeeds"); + + // Check for expiry event + let _ = events + .filter_map(fixtures::event::channel) + .filter_map(fixtures::event::channel::deleted) + .filter(|event| future::ready(event.id == channel.id)) + .next() + .expect_some("a deleted channel event will be delivered") + .await; +} + +#[tokio::test] +async fn previously_expired() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; + + // Expire channels + + app.channels() + .expire(&fixtures::now()) + .await + .expect("expiring channels always succeeds"); + + // Subscribe + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let get::Response(events) = + get::handler(State(app.clone()), subscriber, None, Query::default()) + .await + .expect("subscribe never fails"); + + // Check for expiry event + let _ = events + .filter_map(fixtures::event::channel) + .filter_map(fixtures::event::channel::deleted) + .filter(|event| future::ready(event.id == channel.id)) + .next() + .expect_some("a deleted channel event will be delivered") + .await; +} + +#[tokio::test] +async fn deleting() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; + + // Subscribe + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let get::Response(events) = + get::handler(State(app.clone()), subscriber, None, Query::default()) + .await + .expect("subscribe never fails"); + + // Delete the channel + + app.channels() + .delete(&channel.id, &fixtures::now()) + .await + .expect("deleting a valid channel succeeds"); + + // Check for delete event + let _ = events + .filter_map(fixtures::event::channel) + .filter_map(fixtures::event::channel::deleted) + .filter(|event| future::ready(event.id == channel.id)) + .next() + .expect_some("a deleted channel event will be delivered") + .await; +} + +#[tokio::test] +async fn previously_deleted() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; + + // Delete the channel + + app.channels() + .delete(&channel.id, &fixtures::now()) + .await + .expect("deleting a valid channel succeeds"); + + // Subscribe + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let get::Response(events) = + get::handler(State(app.clone()), subscriber, None, Query::default()) + .await + .expect("subscribe never fails"); + + // Check for expiry event + let _ = events + .filter_map(fixtures::event::channel) + .filter_map(fixtures::event::channel::deleted) + .filter(|event| future::ready(event.id == channel.id)) + .next() + .expect_some("a deleted channel event will be delivered") + .await; +} + +#[tokio::test] +async fn previously_purged() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; + + // Delete and purge the channel + + app.channels() + .delete(&channel.id, &fixtures::ancient()) + .await + .expect("deleting a valid channel succeeds"); + + app.channels() + .purge(&fixtures::now()) + .await + .expect("purging channels always succeeds"); + + // Subscribe + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let get::Response(events) = + get::handler(State(app.clone()), subscriber, None, Query::default()) + .await + .expect("subscribe never fails"); + + // Check for expiry event + events + .filter_map(fixtures::event::channel) + .filter_map(fixtures::event::channel::deleted) + .filter(|event| future::ready(event.id == channel.id)) + .next() + .expect_wait("deleted channel events not delivered") + .await; +} diff --git a/src/event/routes/test/invite.rs b/src/event/routes/test/invite.rs new file mode 100644 index 0000000..d24f474 --- /dev/null +++ b/src/event/routes/test/invite.rs @@ -0,0 +1,80 @@ +use axum::extract::State; +use axum_extra::extract::Query; +use futures::{future, stream::StreamExt as _}; + +use crate::{ + event::routes::get, + test::fixtures::{self, future::Expect as _}, +}; + +#[tokio::test] +async fn accepting_invite() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let issuer = fixtures::login::create(&app, &fixtures::now()).await; + let invite = fixtures::invite::issue(&app, &issuer, &fixtures::now()).await; + + // Subscribe + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let get::Response(events) = + get::handler(State(app.clone()), subscriber, None, Query::default()) + .await + .expect("subscribe never fails"); + + // Accept the invite + + let (name, password) = fixtures::login::propose(); + let (joiner, _) = app + .invites() + .accept(&invite.id, &name, &password, &fixtures::now()) + .await + .expect("accepting an invite succeeds"); + + // Expect a login created event + + let _ = events + .filter_map(fixtures::event::login) + .filter_map(fixtures::event::login::created) + .filter(|event| future::ready(event.login == joiner)) + .next() + .expect_some("a login created event is sent") + .await; +} + +#[tokio::test] +async fn previously_accepted_invite() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let issuer = fixtures::login::create(&app, &fixtures::now()).await; + let invite = fixtures::invite::issue(&app, &issuer, &fixtures::now()).await; + + // Accept the invite + + let (name, password) = fixtures::login::propose(); + let (joiner, _) = app + .invites() + .accept(&invite.id, &name, &password, &fixtures::now()) + .await + .expect("accepting an invite succeeds"); + + // Subscribe + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let get::Response(events) = + get::handler(State(app.clone()), subscriber, None, Query::default()) + .await + .expect("subscribe never fails"); + + // Expect a login created event + + let _ = events + .filter_map(fixtures::event::login) + .filter_map(fixtures::event::login::created) + .filter(|event| future::ready(event.login == joiner)) + .next() + .expect_some("a login created event is sent") + .await; +} diff --git a/src/event/routes/test/message.rs b/src/event/routes/test/message.rs new file mode 100644 index 0000000..63a3f43 --- /dev/null +++ b/src/event/routes/test/message.rs @@ -0,0 +1,349 @@ +use axum::extract::State; +use axum_extra::extract::Query; +use futures::{ + future, + stream::{self, StreamExt as _}, +}; + +use crate::{ + event::routes::get, + test::fixtures::{self, future::Expect as _}, +}; + +#[tokio::test] +async fn sending() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; + + // Call the endpoint + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let get::Response(events) = + get::handler(State(app.clone()), subscriber, None, Query::default()) + .await + .expect("subscribe never fails"); + + // Send a message + + let sender = fixtures::login::create(&app, &fixtures::now()).await; + let message = app + .messages() + .send( + &channel.id, + &sender, + &fixtures::now(), + &fixtures::message::propose(), + ) + .await + .expect("sending a message succeeds"); + + // Verify that an event is delivered + + let _ = events + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::sent) + .filter(|event| future::ready(event.message == message)) + .next() + .expect_some("delivered message sent event") + .await; +} + +#[tokio::test] +async fn previously_sent() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; + + // Send a message + + let sender = fixtures::login::create(&app, &fixtures::now()).await; + let message = app + .messages() + .send( + &channel.id, + &sender, + &fixtures::now(), + &fixtures::message::propose(), + ) + .await + .expect("sending a message succeeds"); + + // Call the endpoint + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let get::Response(events) = + get::handler(State(app.clone()), subscriber, None, Query::default()) + .await + .expect("subscribe never fails"); + + // Verify that an event is delivered + + let _ = events + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::sent) + .filter(|event| future::ready(event.message == message)) + .next() + .expect_some("delivered message sent event") + .await; +} + +#[tokio::test] +async fn sent_in_multiple_channels() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let sender = fixtures::login::create(&app, &fixtures::now()).await; + + let channels = [ + fixtures::channel::create(&app, &fixtures::now()).await, + fixtures::channel::create(&app, &fixtures::now()).await, + ]; + + let messages = stream::iter(channels) + .then(|channel| { + let app = app.clone(); + let sender = sender.clone(); + let channel = channel.clone(); + async move { fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await } + }) + .collect::<Vec<_>>() + .await; + + // Call the endpoint + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let get::Response(events) = get::handler(State(app), subscriber, None, Query::default()) + .await + .expect("subscribe never fails"); + + // Verify the structure of the response. + + let events = events + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::sent) + .take(messages.len()) + .collect::<Vec<_>>() + .expect_ready("events ready") + .await; + + for message in &messages { + assert!(events.iter().any(|event| &event.message == message)); + } +} + +#[tokio::test] +async fn sent_sequentially() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let sender = fixtures::login::create(&app, &fixtures::now()).await; + + let messages = vec![ + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + ]; + + // Subscribe + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let get::Response(events) = get::handler(State(app), subscriber, None, Query::default()) + .await + .expect("subscribe never fails"); + + // Verify the expected events in the expected order + + let mut events = events + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::sent) + .filter(|event| future::ready(messages.iter().any(|message| &event.message == message))); + + for message in &messages { + let event = events + .next() + .expect_some("undelivered messages remaining") + .await; + + assert_eq!(message, &event.message); + } +} + +#[tokio::test] +async fn expiring() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; + let sender = fixtures::login::create(&app, &fixtures::ancient()).await; + let message = fixtures::message::send(&app, &channel, &sender, &fixtures::ancient()).await; + + // Subscribe + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let get::Response(events) = + get::handler(State(app.clone()), subscriber, None, Query::default()) + .await + .expect("subscribe never fails"); + + // Expire messages + + app.messages() + .expire(&fixtures::now()) + .await + .expect("expiring messages always succeeds"); + + // Check for expiry event + let _ = events + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::deleted) + .filter(|event| future::ready(event.id == message.id)) + .next() + .expect_some("a deleted message event will be delivered") + .await; +} + +#[tokio::test] +async fn previously_expired() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; + let sender = fixtures::login::create(&app, &fixtures::ancient()).await; + let message = fixtures::message::send(&app, &channel, &sender, &fixtures::ancient()).await; + + // Expire messages + + app.messages() + .expire(&fixtures::now()) + .await + .expect("expiring messages always succeeds"); + + // Subscribe + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let get::Response(events) = + get::handler(State(app.clone()), subscriber, None, Query::default()) + .await + .expect("subscribe never fails"); + + // Check for expiry event + let _ = events + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::deleted) + .filter(|event| future::ready(event.id == message.id)) + .next() + .expect_some("a deleted message event will be delivered") + .await; +} + +#[tokio::test] +async fn deleting() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let sender = fixtures::login::create(&app, &fixtures::now()).await; + let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; + + // Subscribe + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let get::Response(events) = + get::handler(State(app.clone()), subscriber, None, Query::default()) + .await + .expect("subscribe never fails"); + + // Delete the message + + app.messages() + .delete(&message.id, &fixtures::now()) + .await + .expect("deleting a valid message succeeds"); + + // Check for delete event + let _ = events + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::deleted) + .filter(|event| future::ready(event.id == message.id)) + .next() + .expect_some("a deleted message event will be delivered") + .await; +} + +#[tokio::test] +async fn previously_deleted() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let sender = fixtures::login::create(&app, &fixtures::now()).await; + let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; + + // Delete the message + + app.messages() + .delete(&message.id, &fixtures::now()) + .await + .expect("deleting a valid message succeeds"); + + // Subscribe + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let get::Response(events) = + get::handler(State(app.clone()), subscriber, None, Query::default()) + .await + .expect("subscribe never fails"); + + // Check for delete event + let _ = events + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::deleted) + .filter(|event| future::ready(event.id == message.id)) + .next() + .expect_some("a deleted message event will be delivered") + .await; +} + +#[tokio::test] +async fn previously_purged() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; + let sender = fixtures::login::create(&app, &fixtures::ancient()).await; + let message = fixtures::message::send(&app, &channel, &sender, &fixtures::ancient()).await; + + // Purge the message + + app.messages() + .delete(&message.id, &fixtures::ancient()) + .await + .expect("deleting a valid message succeeds"); + + app.messages() + .purge(&fixtures::now()) + .await + .expect("purge always succeeds"); + + // Subscribe + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let get::Response(events) = + get::handler(State(app.clone()), subscriber, None, Query::default()) + .await + .expect("subscribe never fails"); + + // Check for delete event + + events + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::deleted) + .filter(|event| future::ready(event.id == message.id)) + .next() + .expect_wait("no deleted message will be delivered") + .await; +} diff --git a/src/event/routes/test/mod.rs b/src/event/routes/test/mod.rs new file mode 100644 index 0000000..e7e35f1 --- /dev/null +++ b/src/event/routes/test/mod.rs @@ -0,0 +1,6 @@ +mod channel; +mod invite; +mod message; +mod resume; +mod setup; +mod token; diff --git a/src/event/routes/test/resume.rs b/src/event/routes/test/resume.rs new file mode 100644 index 0000000..62b9bad --- /dev/null +++ b/src/event/routes/test/resume.rs @@ -0,0 +1,219 @@ +use std::future; + +use axum::extract::State; +use axum_extra::extract::Query; +use futures::stream::{self, StreamExt as _}; + +use crate::{ + event::{routes::get, Sequenced as _}, + test::fixtures::{self, future::Expect as _}, +}; + +#[tokio::test] +async fn resume() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let sender = fixtures::login::create(&app, &fixtures::now()).await; + + let initial_message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; + + let later_messages = vec![ + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + ]; + + // Call the endpoint + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + + let resume_at = { + // First subscription + let get::Response(events) = get::handler( + State(app.clone()), + subscriber.clone(), + None, + Query::default(), + ) + .await + .expect("subscribe never fails"); + + let event = events + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::sent) + .filter(|event| future::ready(event.message == initial_message)) + .next() + .expect_some("delivered event for initial message") + .await; + + event.sequence() + }; + + // Resume after disconnect + let get::Response(resumed) = get::handler( + State(app), + subscriber, + Some(resume_at.into()), + Query::default(), + ) + .await + .expect("subscribe never fails"); + + // Verify final events + + let mut events = resumed + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::sent) + .zip(stream::iter(later_messages)); + + while let Some((event, message)) = events.next().expect_ready("event ready").await { + assert_eq!(message, event.message); + } +} + +// This test verifies a real bug I hit developing the vector-of-sequences +// approach to resuming events. A small omission caused the event IDs in a +// resumed stream to _omit_ channels that were in the original stream until +// those channels also appeared in the resumed stream. +// +// Clients would see something like +// * In the original stream, Cfoo=5,Cbar=8 +// * In the resumed stream, Cfoo=6 (no Cbar sequence number) +// +// Disconnecting and reconnecting a second time, using event IDs from that +// initial period of the first resume attempt, would then cause the second +// resume attempt to restart all other channels from the beginning, and not +// from where the first disconnection happened. +// +// As we have switched to a single global event sequence number, this scenario +// can no longer arise, but this test is preserved because the actual behaviour +// _is_ a valid way for clients to behave, and should work. We might as well +// keep testing it. +#[tokio::test] +async fn serial_resume() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let sender = fixtures::login::create(&app, &fixtures::now()).await; + let channel_a = fixtures::channel::create(&app, &fixtures::now()).await; + let channel_b = fixtures::channel::create(&app, &fixtures::now()).await; + + // Call the endpoint + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + + let resume_at = { + let initial_messages = [ + fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel_b, &sender, &fixtures::now()).await, + ]; + + // First subscription + + let get::Response(events) = get::handler( + State(app.clone()), + subscriber.clone(), + None, + Query::default(), + ) + .await + .expect("subscribe never fails"); + + // Check for expected events + + let events = events + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::sent) + .zip(stream::iter(initial_messages)) + .collect::<Vec<_>>() + .expect_ready("zipping a finite list of events is ready immediately") + .await; + + assert!(events + .iter() + .all(|(event, message)| message == &event.message)); + + let (event, _) = events.last().expect("this vec is non-empty"); + + // Take the last one's resume point + + event.sequence() + }; + + // Resume after disconnect + let resume_at = { + let resume_messages = [ + // Note that channel_b does not appear here. The buggy behaviour + // would be masked if channel_b happened to send a new message + // into the resumed event stream. + fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await, + ]; + + // Second subscription + let get::Response(events) = get::handler( + State(app.clone()), + subscriber.clone(), + Some(resume_at.into()), + Query::default(), + ) + .await + .expect("subscribe never fails"); + + // Check for expected events + + let events = events + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::sent) + .zip(stream::iter(resume_messages)) + .collect::<Vec<_>>() + .expect_ready("zipping a finite list of events is ready immediately") + .await; + + assert!(events + .iter() + .all(|(event, message)| message == &event.message)); + + let (event, _) = events.last().expect("this vec is non-empty"); + + // Take the last one's resume point + + event.sequence() + }; + + // Resume after disconnect a second time + { + // At this point, we can send on either channel and demonstrate the + // problem. The resume point should before both of these messages, but + // after _all_ prior messages. + let final_messages = [ + fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel_b, &sender, &fixtures::now()).await, + ]; + + // Third subscription + let get::Response(events) = get::handler( + State(app.clone()), + subscriber.clone(), + Some(resume_at.into()), + Query::default(), + ) + .await + .expect("subscribe never fails"); + + // Check for expected events + + let events = events + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::sent) + .zip(stream::iter(final_messages)) + .collect::<Vec<_>>() + .expect_ready("zipping a finite list of events is ready immediately") + .await; + + assert!(events + .iter() + .all(|(event, message)| message == &event.message)); + }; +} diff --git a/src/event/routes/test/setup.rs b/src/event/routes/test/setup.rs new file mode 100644 index 0000000..007b03d --- /dev/null +++ b/src/event/routes/test/setup.rs @@ -0,0 +1,45 @@ +use axum::extract::State; +use axum_extra::extract::Query; +use futures::{future, stream::StreamExt as _}; + +use crate::{ + event::routes::get, + test::fixtures::{self, future::Expect as _}, +}; + +// There's no test for this in subscribe-then-setup order because creating an +// identity to subscribe with also completes initial setup, preventing the +// test from running. That is also a can't-happen scenario in reality. +#[tokio::test] +async fn previously_completed() { + // Set up the environment + + let app = fixtures::scratch_app().await; + + // Complete initial setup + + let (name, password) = fixtures::login::propose(); + let (owner, _) = app + .setup() + .initial(&name, &password, &fixtures::now()) + .await + .expect("initial setup in an empty app succeeds"); + + // Subscribe to events + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let get::Response(events) = + get::handler(State(app.clone()), subscriber, None, Query::default()) + .await + .expect("subscribe never fails"); + + // Expect a login created event + + let _ = events + .filter_map(fixtures::event::login) + .filter_map(fixtures::event::login::created) + .filter(|event| future::ready(event.login == owner)) + .next() + .expect_some("a login created event is sent") + .await; +} diff --git a/src/event/routes/test/token.rs b/src/event/routes/test/token.rs new file mode 100644 index 0000000..2039d9b --- /dev/null +++ b/src/event/routes/test/token.rs @@ -0,0 +1,95 @@ +use axum::extract::State; +use axum_extra::extract::Query; +use futures::{future, stream::StreamExt as _}; + +use crate::{ + event::routes::get, + test::fixtures::{self, future::Expect as _}, +}; + +#[tokio::test] +async fn terminates_on_token_expiry() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let sender = fixtures::login::create(&app, &fixtures::now()).await; + + // Subscribe via the endpoint + + let subscriber_creds = fixtures::login::create_with_password(&app, &fixtures::now()).await; + let subscriber = + fixtures::identity::logged_in(&app, &subscriber_creds, &fixtures::ancient()).await; + + let get::Response(events) = + get::handler(State(app.clone()), subscriber, None, Query::default()) + .await + .expect("subscribe never fails"); + + // Verify the resulting stream's behaviour + + app.tokens() + .expire(&fixtures::now()) + .await + .expect("expiring tokens succeeds"); + + // These should not be delivered. + let messages = [ + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + ]; + + events + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::sent) + .filter(|event| future::ready(messages.iter().any(|message| &event.message == message))) + .next() + .expect_none("end of stream") + .await; +} + +#[tokio::test] +async fn terminates_on_logout() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let sender = fixtures::login::create(&app, &fixtures::now()).await; + + // Subscribe via the endpoint + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + + let get::Response(events) = get::handler( + State(app.clone()), + subscriber.clone(), + None, + Query::default(), + ) + .await + .expect("subscribe never fails"); + + // Verify the resulting stream's behaviour + + app.tokens() + .logout(&subscriber.token) + .await + .expect("expiring tokens succeeds"); + + // These should not be delivered. + + let messages = [ + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + ]; + + events + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::sent) + .filter(|event| future::ready(messages.iter().any(|message| &event.message == message))) + .next() + .expect_none("end of stream") + .await; +} diff --git a/src/invite/app.rs b/src/invite/app.rs index 64ba753..176075f 100644 --- a/src/invite/app.rs +++ b/src/invite/app.rs @@ -5,7 +5,7 @@ use super::{repo::Provider as _, Id, Invite, Summary}; use crate::{ clock::DateTime, db::{Duplicate as _, NotFound as _}, - event::repo::Provider as _, + event::{repo::Provider as _, Broadcaster, Event}, login::{repo::Provider as _, Login, Password}, name::Name, token::{repo::Provider as _, Secret}, @@ -13,18 +13,15 @@ use crate::{ pub struct Invites<'a> { db: &'a SqlitePool, + events: &'a Broadcaster, } impl<'a> Invites<'a> { - pub const fn new(db: &'a SqlitePool) -> Self { - Self { db } + pub const fn new(db: &'a SqlitePool, events: &'a Broadcaster) -> Self { + Self { db, events } } - pub async fn create( - &self, - issuer: &Login, - issued_at: &DateTime, - ) -> Result<Invite, sqlx::Error> { + pub async fn issue(&self, issuer: &Login, issued_at: &DateTime) -> Result<Invite, sqlx::Error> { let mut tx = self.db.begin().await?; let invite = tx.invites().create(issuer, issued_at).await?; tx.commit().await?; @@ -73,6 +70,9 @@ impl<'a> Invites<'a> { let secret = tx.tokens().issue(&login, accepted_at).await?; tx.commit().await?; + self.events + .broadcast(login.events().map(Event::from).collect::<Vec<_>>()); + Ok((login.as_created(), secret)) } diff --git a/src/invite/mod.rs b/src/invite/mod.rs index d59fb9c..53ca984 100644 --- a/src/invite/mod.rs +++ b/src/invite/mod.rs @@ -14,7 +14,7 @@ pub struct Invite { pub issued_at: DateTime, } -#[derive(serde::Serialize)] +#[derive(Debug, serde::Serialize)] pub struct Summary { pub id: Id, pub issuer: nfc::String, diff --git a/src/invite/routes/invite/mod.rs b/src/invite/routes/invite/mod.rs index 04593fd..c22029a 100644 --- a/src/invite/routes/invite/mod.rs +++ b/src/invite/routes/invite/mod.rs @@ -1,4 +1,6 @@ pub mod get; pub mod post; +#[cfg(test)] +pub mod test; type PathInfo = crate::invite::Id; diff --git a/src/invite/routes/invite/post.rs b/src/invite/routes/invite/post.rs index 3ca4e6b..0dd8dba 100644 --- a/src/invite/routes/invite/post.rs +++ b/src/invite/routes/invite/post.rs @@ -36,7 +36,8 @@ pub struct Request { pub password: Password, } -pub struct Error(app::AcceptError); +#[derive(Debug)] +pub struct Error(pub app::AcceptError); impl IntoResponse for Error { fn into_response(self) -> Response { diff --git a/src/invite/routes/invite/test/get.rs b/src/invite/routes/invite/test/get.rs new file mode 100644 index 0000000..c6780ed --- /dev/null +++ b/src/invite/routes/invite/test/get.rs @@ -0,0 +1,65 @@ +use axum::extract::{Json, Path, State}; + +use crate::{invite::routes::invite::get, test::fixtures}; + +#[tokio::test] +async fn valid_invite() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let issuer = fixtures::login::create(&app, &fixtures::now()).await; + let invite = fixtures::invite::issue(&app, &issuer, &fixtures::now()).await; + + // Call endpoint + + let Json(response) = get::handler(State(app), Path(invite.id)) + .await + .expect("get for an existing invite succeeds"); + + // Verify response + + assert_eq!(issuer.name.display(), &response.issuer); + assert_eq!(invite.issued_at, response.issued_at); +} + +#[tokio::test] +async fn nonexistent_invite() { + // Set up the environment + + let app = fixtures::scratch_app().await; + + // Call endpoint + + let invite = fixtures::invite::fictitious(); + let error = get::handler(State(app), Path(invite.clone())) + .await + .expect_err("get for a nonexistent invite fails"); + + // Verify response + + assert!(matches!(error, get::Error::NotFound(error_id) if invite == error_id)); +} + +#[tokio::test] +async fn expired_invite() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let issuer = fixtures::login::create(&app, &fixtures::ancient()).await; + let invite = fixtures::invite::issue(&app, &issuer, &fixtures::ancient()).await; + + app.invites() + .expire(&fixtures::now()) + .await + .expect("expiring invites never fails"); + + // Call endpoint + + let error = get::handler(State(app), Path(invite.id.clone())) + .await + .expect_err("get for an expired invite fails"); + + // Verify response + + assert!(matches!(error, get::Error::NotFound(error_id) if invite.id == error_id)); +} diff --git a/src/invite/routes/invite/test/mod.rs b/src/invite/routes/invite/test/mod.rs new file mode 100644 index 0000000..d6c1f06 --- /dev/null +++ b/src/invite/routes/invite/test/mod.rs @@ -0,0 +1,2 @@ +mod get; +mod post; diff --git a/src/invite/routes/invite/test/post.rs b/src/invite/routes/invite/test/post.rs new file mode 100644 index 0000000..65ab61e --- /dev/null +++ b/src/invite/routes/invite/test/post.rs @@ -0,0 +1,208 @@ +use axum::extract::{Json, Path, State}; + +use crate::{ + invite::{app::AcceptError, routes::invite::post}, + name::Name, + test::fixtures, +}; + +#[tokio::test] +async fn valid_invite() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let issuer = fixtures::login::create(&app, &fixtures::now()).await; + let invite = fixtures::invite::issue(&app, &issuer, &fixtures::now()).await; + + // Call the endpoint + + let (name, password) = fixtures::login::propose(); + let identity = fixtures::cookie::not_logged_in(); + let request = post::Request { + name: name.clone(), + password: password.clone(), + }; + let (identity, Json(response)) = post::handler( + State(app.clone()), + fixtures::now(), + identity, + Path(invite.id), + Json(request), + ) + .await + .expect("accepting a valid invite succeeds"); + + // Verify the response + + assert!(identity.secret().is_some()); + assert_eq!(name, response.name); + + // Verify that the issued token is valid + + let secret = identity + .secret() + .expect("newly-issued identity has a token secret"); + let (_, login) = app + .tokens() + .validate(&secret, &fixtures::now()) + .await + .expect("newly-issued identity cookie is valid"); + assert_eq!(response, login); + + // Verify that the given credentials can log in + + let (login, _) = app + .tokens() + .login(&name, &password, &fixtures::now()) + .await + .expect("credentials given on signup are valid"); + assert_eq!(response, login); +} + +#[tokio::test] +async fn nonexistent_invite() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let invite = fixtures::invite::fictitious(); + + // Call the endpoint + + let (name, password) = fixtures::login::propose(); + let identity = fixtures::cookie::not_logged_in(); + let request = post::Request { + name: name.clone(), + password: password.clone(), + }; + let post::Error(error) = post::handler( + State(app.clone()), + fixtures::now(), + identity, + Path(invite.clone()), + Json(request), + ) + .await + .expect_err("accepting a nonexistent invite fails"); + + // Verify the response + + assert!(matches!(error, AcceptError::NotFound(error_id) if error_id == invite)); +} + +#[tokio::test] +async fn expired_invite() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let issuer = fixtures::login::create(&app, &fixtures::ancient()).await; + let invite = fixtures::invite::issue(&app, &issuer, &fixtures::ancient()).await; + + app.invites() + .expire(&fixtures::now()) + .await + .expect("expiring invites never fails"); + + // Call the endpoint + + let (name, password) = fixtures::login::propose(); + let identity = fixtures::cookie::not_logged_in(); + let request = post::Request { + name: name.clone(), + password: password.clone(), + }; + let post::Error(error) = post::handler( + State(app.clone()), + fixtures::now(), + identity, + Path(invite.id.clone()), + Json(request), + ) + .await + .expect_err("accepting a nonexistent invite fails"); + + // Verify the response + + assert!(matches!(error, AcceptError::NotFound(error_id) if error_id == invite.id)); +} + +#[tokio::test] +async fn accepted_invite() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let issuer = fixtures::login::create(&app, &fixtures::ancient()).await; + let invite = fixtures::invite::issue(&app, &issuer, &fixtures::ancient()).await; + + let (name, password) = fixtures::login::propose(); + app.invites() + .accept(&invite.id, &name, &password, &fixtures::now()) + .await + .expect("accepting a valid invite succeeds"); + + // Call the endpoint + + let (name, password) = fixtures::login::propose(); + let identity = fixtures::cookie::not_logged_in(); + let request = post::Request { + name: name.clone(), + password: password.clone(), + }; + let post::Error(error) = post::handler( + State(app.clone()), + fixtures::now(), + identity, + Path(invite.id.clone()), + Json(request), + ) + .await + .expect_err("accepting a nonexistent invite fails"); + + // Verify the response + + assert!(matches!(error, AcceptError::NotFound(error_id) if error_id == invite.id)); +} + +#[tokio::test] +async fn conflicting_name() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let issuer = fixtures::login::create(&app, &fixtures::ancient()).await; + let invite = fixtures::invite::issue(&app, &issuer, &fixtures::ancient()).await; + + let existing_name = Name::from("rijksmuseum"); + app.logins() + .create( + &existing_name, + &fixtures::login::propose_password(), + &fixtures::now(), + ) + .await + .expect("creating a login in an empty environment succeeds"); + + // Call the endpoint + + let conflicting_name = Name::from("r\u{0133}ksmuseum"); + let password = fixtures::login::propose_password(); + + let identity = fixtures::cookie::not_logged_in(); + let request = post::Request { + name: conflicting_name.clone(), + password: password.clone(), + }; + let post::Error(error) = post::handler( + State(app.clone()), + fixtures::now(), + identity, + Path(invite.id.clone()), + Json(request), + ) + .await + .expect_err("accepting a nonexistent invite fails"); + + // Verify the response + + assert!( + matches!(error, AcceptError::DuplicateLogin(error_name) if error_name == conflicting_name) + ); +} diff --git a/src/invite/routes/mod.rs b/src/invite/routes/mod.rs index dae20ba..2f7375c 100644 --- a/src/invite/routes/mod.rs +++ b/src/invite/routes/mod.rs @@ -7,6 +7,8 @@ use crate::app::App; mod invite; mod post; +#[cfg(test)] +mod test; pub fn router() -> Router<App> { Router::new() diff --git a/src/invite/routes/post.rs b/src/invite/routes/post.rs index eb7d706..898081e 100644 --- a/src/invite/routes/post.rs +++ b/src/invite/routes/post.rs @@ -10,7 +10,7 @@ pub async fn handler( identity: Identity, _: Json<Request>, ) -> Result<Json<Invite>, Internal> { - let invite = app.invites().create(&identity.login, &issued_at).await?; + let invite = app.invites().issue(&identity.login, &issued_at).await?; Ok(Json(invite)) } diff --git a/src/invite/routes/test.rs b/src/invite/routes/test.rs new file mode 100644 index 0000000..4d99660 --- /dev/null +++ b/src/invite/routes/test.rs @@ -0,0 +1,28 @@ +use axum::extract::{Json, State}; + +use super::post; +use crate::test::fixtures; + +#[tokio::test] +async fn create_invite() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let issuer = fixtures::identity::create(&app, &fixtures::now()).await; + let issued_at = fixtures::now(); + + // Call the endpoint + + let Json(invite) = post::handler( + State(app), + issued_at.clone(), + issuer.clone(), + Json(post::Request {}), + ) + .await + .expect("creating an invite always succeeds"); + + // Verify the response + assert_eq!(issuer.login.id, invite.issuer); + assert_eq!(&*issued_at, &invite.issued_at); +} diff --git a/src/login/password.rs b/src/login/password.rs index c27c950..e1d164e 100644 --- a/src/login/password.rs +++ b/src/login/password.rs @@ -31,7 +31,7 @@ impl fmt::Debug for StoredHash { } } -#[derive(serde::Deserialize)] +#[derive(Clone, serde::Deserialize)] #[serde(transparent)] pub struct Password(nfc::String); diff --git a/src/message/routes/message/mod.rs b/src/message/routes/message/mod.rs index 545ad26..45a7e9d 100644 --- a/src/message/routes/message/mod.rs +++ b/src/message/routes/message/mod.rs @@ -3,9 +3,9 @@ mod test; pub mod delete { use axum::{ - extract::{Path, State}, + extract::{Json, Path, State}, http::StatusCode, - response::{IntoResponse, Response}, + response::{self, IntoResponse}, }; use crate::{ @@ -21,10 +21,21 @@ pub mod delete { Path(message): Path<message::Id>, RequestedAt(deleted_at): RequestedAt, _: Identity, - ) -> Result<StatusCode, Error> { + ) -> Result<Response, Error> { app.messages().delete(&message, &deleted_at).await?; - Ok(StatusCode::ACCEPTED) + Ok(Response { id: message }) + } + + #[derive(Debug, serde::Serialize)] + pub struct Response { + pub id: message::Id, + } + + impl IntoResponse for Response { + fn into_response(self) -> response::Response { + (StatusCode::ACCEPTED, Json(self)).into_response() + } } #[derive(Debug, thiserror::Error)] @@ -32,7 +43,7 @@ pub mod delete { pub struct Error(#[from] pub DeleteError); impl IntoResponse for Error { - fn into_response(self) -> Response { + fn into_response(self) -> response::Response { let Self(error) = self; #[allow(clippy::match_wildcard_for_single_variants)] match error { diff --git a/src/message/routes/message/test.rs b/src/message/routes/message/test.rs index 2016fb8..ae89506 100644 --- a/src/message/routes/message/test.rs +++ b/src/message/routes/message/test.rs @@ -1,7 +1,4 @@ -use axum::{ - extract::{Path, State}, - http::StatusCode, -}; +use axum::extract::{Path, State}; use super::delete; use crate::{message::app, test::fixtures}; @@ -29,7 +26,7 @@ pub async fn delete_message() { // Verify the response - assert_eq!(response, StatusCode::ACCEPTED); + assert_eq!(message.id, response.id); // Verify the semantics diff --git a/src/setup/routes/mod.rs b/src/setup/routes/mod.rs index e1e1711..6054983 100644 --- a/src/setup/routes/mod.rs +++ b/src/setup/routes/mod.rs @@ -3,6 +3,8 @@ use axum::{routing::post, Router}; use crate::app::App; mod post; +#[cfg(test)] +mod test; pub fn router() -> Router<App> { Router::new().route("/api/setup", post(post::handler)) diff --git a/src/setup/routes/test.rs b/src/setup/routes/test.rs new file mode 100644 index 0000000..f7562ae --- /dev/null +++ b/src/setup/routes/test.rs @@ -0,0 +1,69 @@ +use axum::extract::{Json, State}; + +use super::post; +use crate::{setup::app, test::fixtures}; + +#[tokio::test] +async fn fresh_instance() { + // Set up the environment + + let app = fixtures::scratch_app().await; + + // Call the endpoint + let identity = fixtures::cookie::not_logged_in(); + let (name, password) = fixtures::login::propose(); + let request = post::Request { + name: name.clone(), + password: password.clone(), + }; + let (identity, Json(response)) = + post::handler(State(app.clone()), fixtures::now(), identity, Json(request)) + .await + .expect("setup in a fresh app succeeds"); + + // Verify the response + + assert_eq!(name, response.name); + + // Verify that the issued token is valid + + let secret = identity + .secret() + .expect("newly-issued identity has a token secret"); + let (_, login) = app + .tokens() + .validate(&secret, &fixtures::now()) + .await + .expect("newly-issued identity cookie is valid"); + assert_eq!(response, login); + + // Verify that the given credentials can log in + + let (login, _) = app + .tokens() + .login(&name, &password, &fixtures::now()) + .await + .expect("credentials given on signup are valid"); + assert_eq!(response, login); +} + +#[tokio::test] +async fn login_exists() { + // Set up the environment + + let app = fixtures::scratch_app().await; + fixtures::login::create(&app, &fixtures::now()).await; + + // Call the endpoint + let identity = fixtures::cookie::not_logged_in(); + let (name, password) = fixtures::login::propose(); + let request = post::Request { name, password }; + let post::Error(error) = + post::handler(State(app.clone()), fixtures::now(), identity, Json(request)) + .await + .expect_err("setup in a populated app fails"); + + // Verify the response + + assert!(matches!(error, app::Error::SetupCompleted)); +} diff --git a/src/test/fixtures/channel.rs b/src/test/fixtures/channel.rs index 8cb38ae..0c6480b 100644 --- a/src/test/fixtures/channel.rs +++ b/src/test/fixtures/channel.rs @@ -1,5 +1,3 @@ -use std::future; - use faker_rand::{ en_us::{addresses::CityName, names::FullName}, faker_impl_from_templates, @@ -10,7 +8,6 @@ use crate::{ app::App, channel::{self, Channel}, clock::RequestedAt, - event::Event, name::Name, }; @@ -31,20 +28,6 @@ faker_impl_from_templates! { NameTemplate; "{} {}", CityName, FullName; } -pub fn events(event: Event) -> future::Ready<Option<channel::Event>> { - future::ready(match event { - Event::Channel(channel) => Some(channel), - _ => None, - }) -} - -pub fn created(event: channel::Event) -> future::Ready<Option<channel::event::Created>> { - future::ready(match event { - channel::Event::Created(event) => Some(event), - channel::Event::Deleted(_) => None, - }) -} - pub fn fictitious() -> channel::Id { channel::Id::generate() } diff --git a/src/test/fixtures/event.rs b/src/test/fixtures/event.rs index fa4fbc0..de02d4d 100644 --- a/src/test/fixtures/event.rs +++ b/src/test/fixtures/event.rs @@ -1,8 +1,79 @@ -use crate::message::{Event, Message}; +use std::future::{self, Ready}; -pub fn message_sent(event: &Event, message: &Message) -> bool { - matches!( - &event, - Event::Sent(event) if message == &event.into() - ) +use crate::event::Event; + +pub fn channel(event: Event) -> Ready<Option<channel::Event>> { + future::ready(match event { + Event::Channel(channel) => Some(channel), + _ => None, + }) +} + +pub fn message(event: Event) -> Ready<Option<message::Event>> { + future::ready(match event { + Event::Message(event) => Some(event), + _ => None, + }) +} + +pub fn login(event: Event) -> Ready<Option<login::Event>> { + future::ready(match event { + Event::Login(event) => Some(event), + _ => None, + }) +} + +pub mod channel { + use std::future::{self, Ready}; + + use crate::channel::event; + pub use crate::channel::Event; + + pub fn created(event: Event) -> Ready<Option<event::Created>> { + future::ready(match event { + Event::Created(event) => Some(event), + Event::Deleted(_) => None, + }) + } + + pub fn deleted(event: Event) -> Ready<Option<event::Deleted>> { + future::ready(match event { + Event::Deleted(event) => Some(event), + Event::Created(_) => None, + }) + } +} + +pub mod message { + use std::future::{self, Ready}; + + use crate::message::event; + pub use crate::message::Event; + + pub fn sent(event: Event) -> Ready<Option<event::Sent>> { + future::ready(match event { + Event::Sent(event) => Some(event), + Event::Deleted(_) => None, + }) + } + + pub fn deleted(event: Event) -> future::Ready<Option<event::Deleted>> { + future::ready(match event { + Event::Deleted(event) => Some(event), + Event::Sent(_) => None, + }) + } +} + +pub mod login { + use std::future::{self, Ready}; + + use crate::login::event; + pub use crate::login::Event; + + pub fn created(event: Event) -> Ready<Option<event::Created>> { + future::ready(match event { + Event::Created(event) => Some(event), + }) + } } diff --git a/src/test/fixtures/future.rs b/src/test/fixtures/future.rs index bbdc9f8..2f810a3 100644 --- a/src/test/fixtures/future.rs +++ b/src/test/fixtures/future.rs @@ -1,55 +1,221 @@ -use std::{future::IntoFuture, time::Duration}; +use std::{future::Future, pin::Pin, task}; -use futures::{stream, Stream}; -use tokio::time::timeout; +use futures::stream; -async fn immediately<F>(fut: F) -> F::Output +// Combinators for futures that prevent waits, even when the underlying future +// would block. +// +// These are only useful for futures with no bound on how long they may wait, +// and this trait is only implemented on futures that are likely to have that +// characteristic. Trying to apply this to futures that already have some +// bounded wait time may make tests fail inappropriately and can hide other +// logic errors. +pub trait Expect: Sized { + // The returned future expects the underlying future to be ready immediately, + // and panics with the provided message if it is not. + // + // For stream operations, can be used to assert immediate completion. + fn expect_ready(self, message: &str) -> Ready<Self> + where + Self: Future; + + // The returned future expects the underlying future _not_ to be ready, and + // panics if it is. This is usually a useful proxy for "I expect this to never + // arrive" or "to not be here yet." The future is transformed to return `()`, + // since the underlying future can never provide a value. + // + // For stream operations, can be used to assert that completion hasn't happened + // yet. + fn expect_wait(self, message: &str) -> Wait<Self> + where + Self: Future; + + // The returned future expects the underlying future to resolve immediately, to + // a `Some` value. If it resolves to `None` or is not ready, it panics. The + // future is transformed to return the inner value from the `Some` case, like + // [`Option::expect`]. + // + // For stream operations, can be used to assert that the stream has at least one + // message. + fn expect_some<T>(self, message: &str) -> Some<Self> + where + Self: Future<Output = Option<T>>; + + // The returned future expects the underlying future to resolve immediately, to + // a `None` value. If it resolves to `Some(_)`, or is not ready, it panics. The + // future is transformed to return `()`, since the underlying future's value is + // fixed. + // + // For stream operations, can be used to assert that the stream has ended. + fn expect_none<T>(self, message: &str) -> None<Self> + where + Self: Future<Output = Option<T>>; +} + +impl<'a, St> Expect for stream::Next<'a, St> { + fn expect_ready(self, message: &str) -> Ready<Self> { + Ready { + future: self, + message, + } + } + + fn expect_wait(self, message: &str) -> Wait<Self> { + Wait { + future: self, + message, + } + } + + fn expect_some<T>(self, message: &str) -> Some<Self> + where + Self: Future<Output = Option<T>>, + { + Some { + future: self, + message, + } + } + + fn expect_none<T>(self, message: &str) -> None<Self> + where + Self: Future<Output = Option<T>>, + { + None { + future: self, + message, + } + } +} + +impl<St, C> Expect for stream::Collect<St, C> { + fn expect_ready(self, message: &str) -> Ready<Self> { + Ready { + future: self, + message, + } + } + + fn expect_wait(self, message: &str) -> Wait<Self> { + Wait { + future: self, + message, + } + } + + fn expect_some<T>(self, message: &str) -> Some<Self> + where + Self: Future<Output = Option<T>>, + { + Some { + future: self, + message, + } + } + + fn expect_none<T>(self, message: &str) -> None<Self> + where + Self: Future<Output = Option<T>>, + { + None { + future: self, + message, + } + } +} + +#[pin_project::pin_project] +pub struct Ready<'m, F> { + #[pin] + future: F, + message: &'m str, +} + +impl<'m, F> Future for Ready<'m, F> where - F: IntoFuture, + F: Future + std::fmt::Debug, { - // I haven't been particularly rigorous here. Zero delay _seems to work_, - // but this can be set higher; it makes tests that fail to meet the - // "immediate" expectation take longer, but gives slow tests time to - // succeed, as well. - let duration = Duration::from_nanos(0); - timeout(duration, fut) - .await - .expect("expected result immediately") -} - -// This is only intended for streams, since their `next()`, `collect()`, and -// so on can all block indefinitely on an empty stream. There's no need to -// force immediacy on futures that "can't" block forever, and it can hide logic -// errors if you do that. -// -// The impls below _could_ be replaced with a blanket impl for all future -// types, otherwise. The choice to restrict impls to stream futures is -// deliberate. -pub trait Immediately { - type Output; + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> { + let this = self.project(); + + if let task::Poll::Ready(value) = this.future.poll(cx) { + task::Poll::Ready(value) + } else { + panic!("{}", this.message); + } + } +} + +#[pin_project::pin_project] +pub struct Wait<'m, F> { + #[pin] + future: F, + message: &'m str, +} - async fn immediately(self) -> Self::Output; +impl<'m, F> Future for Wait<'m, F> +where + F: Future + std::fmt::Debug, +{ + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> { + let this = self.project(); + + if this.future.poll(cx).is_pending() { + task::Poll::Ready(()) + } else { + panic!("{}", this.message); + } + } } -impl<'a, St> Immediately for stream::Next<'a, St> +#[pin_project::pin_project] +pub struct Some<'m, F> { + #[pin] + future: F, + message: &'m str, +} + +impl<'m, F, T> Future for Some<'m, F> where - St: Stream + Unpin + ?Sized, + F: Future<Output = Option<T>> + std::fmt::Debug, { - type Output = Option<<St as Stream>::Item>; + type Output = T; + + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> { + let this = self.project(); - async fn immediately(self) -> Self::Output { - immediately(self).await + if let task::Poll::Ready(Option::Some(value)) = this.future.poll(cx) { + task::Poll::Ready(value) + } else { + panic!("{}", this.message) + } } } -impl<St, C> Immediately for stream::Collect<St, C> +#[pin_project::pin_project] +pub struct None<'m, F> { + #[pin] + future: F, + message: &'m str, +} + +impl<'m, F, T> Future for None<'m, F> where - St: Stream, - C: Default + Extend<<St as Stream>::Item>, + F: Future<Output = Option<T>> + std::fmt::Debug, { - type Output = C; + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> { + let this = self.project(); - async fn immediately(self) -> Self::Output { - immediately(self).await + if let task::Poll::Ready(Option::None) = this.future.poll(cx) { + task::Poll::Ready(()) + } else { + panic!("{}", this.message) + } } } diff --git a/src/test/fixtures/invite.rs b/src/test/fixtures/invite.rs new file mode 100644 index 0000000..654d1b4 --- /dev/null +++ b/src/test/fixtures/invite.rs @@ -0,0 +1,17 @@ +use crate::{ + app::App, + clock::DateTime, + invite::{self, Invite}, + login::Login, +}; + +pub async fn issue(app: &App, issuer: &Login, issued_at: &DateTime) -> Invite { + app.invites() + .issue(issuer, issued_at) + .await + .expect("issuing invites never fails") +} + +pub fn fictitious() -> invite::Id { + invite::Id::generate() +} diff --git a/src/test/fixtures/message.rs b/src/test/fixtures/message.rs index 3aebdd9..d3b4719 100644 --- a/src/test/fixtures/message.rs +++ b/src/test/fixtures/message.rs @@ -1,12 +1,9 @@ -use std::future; - use faker_rand::lorem::Paragraphs; use crate::{ app::App, channel::Channel, clock::RequestedAt, - event::Event, login::Login, message::{self, Body, Message}, }; @@ -24,13 +21,6 @@ pub fn propose() -> Body { rand::random::<Paragraphs>().to_string().into() } -pub fn events(event: Event) -> future::Ready<Option<message::Event>> { - future::ready(match event { - Event::Message(event) => Some(event), - _ => None, - }) -} - pub fn fictitious() -> message::Id { message::Id::generate() } diff --git a/src/test/fixtures/mod.rs b/src/test/fixtures/mod.rs index 9111811..2b7b6af 100644 --- a/src/test/fixtures/mod.rs +++ b/src/test/fixtures/mod.rs @@ -7,6 +7,7 @@ pub mod cookie; pub mod event; pub mod future; pub mod identity; +pub mod invite; pub mod login; pub mod message; diff --git a/src/ui/mime.rs b/src/ui/mime.rs index 9c724f0..7818ac1 100644 --- a/src/ui/mime.rs +++ b/src/ui/mime.rs @@ -1,7 +1,10 @@ use mime::Mime; use unix_path::Path; -// Extremely manual; using `std::path` here would result in platform-dependent behaviour when it's not appropriate (the URLs passed here always use `/` and are parsed like URLs). Using `unix_path` might be an option, but it's not clearly +// Extremely manual; using `std::path` here would result in platform-dependent +// behaviour when it's not appropriate (the URLs passed here always use `/` and +// are parsed like URLs). Using `unix_path` might be an option, but it's not +// clearly pub fn from_path<P>(path: P) -> Result<Mime, mime::FromStrError> where P: AsRef<Path>, diff --git a/src/ui/routes/ch/channel.rs b/src/ui/routes/ch/channel.rs index a338f1f..a854f14 100644 --- a/src/ui/routes/ch/channel.rs +++ b/src/ui/routes/ch/channel.rs @@ -6,7 +6,7 @@ pub mod get { use crate::{ app::App, - channel, + channel::{self, app}, error::Internal, token::extract::Identity, ui::{ @@ -21,18 +21,14 @@ pub mod get { Path(channel): Path<channel::Id>, ) -> Result<Asset, Error> { let _ = identity.ok_or(Error::NotLoggedIn)?; - app.channels() - .get(&channel) - .await - .map_err(Error::internal)? - .ok_or(Error::NotFound)?; + app.channels().get(&channel).await.map_err(Error::from)?; Assets::index().map_err(Error::Internal) } #[derive(Debug, thiserror::Error)] pub enum Error { - #[error("requested channel not found")] + #[error("channel not found")] NotFound, #[error("not logged in")] NotLoggedIn, @@ -40,9 +36,12 @@ pub mod get { Internal(Internal), } - impl Error { - fn internal(err: impl Into<Internal>) -> Self { - Self::Internal(err.into()) + impl From<app::Error> for Error { + fn from(error: app::Error) -> Self { + match error { + app::Error::NotFound(_) | app::Error::Deleted(_) => Self::NotFound, + other => Self::Internal(other.into()), + } } } |
