From f9cbf95e5b850a7407c34f936c0f858520682a5d Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Thu, 24 Oct 2024 19:49:54 -0400 Subject: Tests for retrieving invites --- src/test/fixtures/mod.rs | 1 + 1 file changed, 1 insertion(+) (limited to 'src/test/fixtures/mod.rs') diff --git a/src/test/fixtures/mod.rs b/src/test/fixtures/mod.rs index 9111811..2b7b6af 100644 --- a/src/test/fixtures/mod.rs +++ b/src/test/fixtures/mod.rs @@ -7,6 +7,7 @@ pub mod cookie; pub mod event; pub mod future; pub mod identity; +pub mod invite; pub mod login; pub mod message; -- cgit v1.2.3 From 0bb17bd01640492db2685e67bacac12dd54a9f59 Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Thu, 24 Oct 2024 22:37:22 -0400 Subject: Tests for channel, invite, setup, and message deletion events. This also found a bug! No live event was being emitted during invite accept. The only way to find out about invites was to reconnect. --- src/app.rs | 2 +- src/event/routes/test.rs | 459 --------------------------------------- src/event/routes/test/channel.rs | 212 ++++++++++++++++++ src/event/routes/test/invite.rs | 82 +++++++ src/event/routes/test/message.rs | 316 +++++++++++++++++++++++++++ src/event/routes/test/mod.rs | 6 + src/event/routes/test/resume.rs | 220 +++++++++++++++++++ src/event/routes/test/setup.rs | 46 ++++ src/event/routes/test/token.rs | 97 +++++++++ src/invite/app.rs | 10 +- src/test/fixtures/channel.rs | 7 + src/test/fixtures/event.rs | 8 - src/test/fixtures/login.rs | 16 ++ src/test/fixtures/message.rs | 16 +- src/test/fixtures/mod.rs | 1 - 15 files changed, 1025 insertions(+), 473 deletions(-) delete mode 100644 src/event/routes/test.rs create mode 100644 src/event/routes/test/channel.rs create mode 100644 src/event/routes/test/invite.rs create mode 100644 src/event/routes/test/message.rs create mode 100644 src/event/routes/test/mod.rs create mode 100644 src/event/routes/test/resume.rs create mode 100644 src/event/routes/test/setup.rs create mode 100644 src/event/routes/test/token.rs delete mode 100644 src/test/fixtures/event.rs (limited to 'src/test/fixtures/mod.rs') diff --git a/src/app.rs b/src/app.rs index 6d71259..bc1daa5 100644 --- a/src/app.rs +++ b/src/app.rs @@ -44,7 +44,7 @@ impl App { } pub const fn invites(&self) -> Invites { - Invites::new(&self.db) + Invites::new(&self.db, &self.events) } #[cfg(not(test))] diff --git a/src/event/routes/test.rs b/src/event/routes/test.rs deleted file mode 100644 index 49f8094..0000000 --- a/src/event/routes/test.rs +++ /dev/null @@ -1,459 +0,0 @@ -use axum::extract::State; -use axum_extra::extract::Query; -use futures::{ - future, - stream::{self, StreamExt as _}, -}; - -use super::get; -use crate::{ - event::Sequenced as _, - test::fixtures::{self, future::Immediately as _}, -}; - -#[tokio::test] -async fn includes_historical_message() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let sender = fixtures::login::create(&app, &fixtures::now()).await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; - let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; - - // Call the endpoint - - let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = get::handler(State(app), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); - - // Verify the structure of the response. - - let event = events - .filter_map(fixtures::message::events) - .next() - .immediately() - .await - .expect("delivered stored message"); - - assert!(fixtures::event::message_sent(&event, &message)); -} - -#[tokio::test] -async fn includes_live_message() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; - - // Call the endpoint - - let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); - - // Verify the semantics - - let sender = fixtures::login::create(&app, &fixtures::now()).await; - let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; - - let event = events - .filter_map(fixtures::message::events) - .next() - .immediately() - .await - .expect("delivered live message"); - - assert!(fixtures::event::message_sent(&event, &message)); -} - -#[tokio::test] -async fn includes_multiple_channels() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let sender = fixtures::login::create(&app, &fixtures::now()).await; - - let channels = [ - fixtures::channel::create(&app, &fixtures::now()).await, - fixtures::channel::create(&app, &fixtures::now()).await, - ]; - - let messages = stream::iter(channels) - .then(|channel| { - let app = app.clone(); - let sender = sender.clone(); - let channel = channel.clone(); - async move { fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await } - }) - .collect::>() - .await; - - // Call the endpoint - - let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = get::handler(State(app), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); - - // Verify the structure of the response. - - let events = events - .filter_map(fixtures::message::events) - .take(messages.len()) - .collect::>() - .immediately() - .await; - - for message in &messages { - assert!(events - .iter() - .any(|event| fixtures::event::message_sent(event, message))); - } -} - -#[tokio::test] -async fn sequential_messages() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; - let sender = fixtures::login::create(&app, &fixtures::now()).await; - - let messages = vec![ - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, - ]; - - // Call the endpoint - - let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = get::handler(State(app), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); - - // Verify the structure of the response. - - let mut events = events - .filter_map(fixtures::message::events) - .filter(|event| { - future::ready( - messages - .iter() - .any(|message| fixtures::event::message_sent(event, message)), - ) - }); - - // Verify delivery in order - for message in &messages { - let event = events - .next() - .immediately() - .await - .expect("undelivered messages remaining"); - - assert!(fixtures::event::message_sent(&event, message)); - } -} - -#[tokio::test] -async fn resumes_from() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; - let sender = fixtures::login::create(&app, &fixtures::now()).await; - - let initial_message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; - - let later_messages = vec![ - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, - ]; - - // Call the endpoint - - let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - - let resume_at = { - // First subscription - let get::Response(events) = get::handler( - State(app.clone()), - subscriber.clone(), - None, - Query::default(), - ) - .await - .expect("subscribe never fails"); - - let event = events - .filter_map(fixtures::message::events) - .next() - .immediately() - .await - .expect("delivered events"); - - assert!(fixtures::event::message_sent(&event, &initial_message)); - - event.sequence() - }; - - // Resume after disconnect - let get::Response(resumed) = get::handler( - State(app), - subscriber, - Some(resume_at.into()), - Query::default(), - ) - .await - .expect("subscribe never fails"); - - // Verify the structure of the response. - - let events = resumed - .filter_map(fixtures::message::events) - .take(later_messages.len()) - .collect::>() - .immediately() - .await; - - for message in &later_messages { - assert!(events - .iter() - .any(|event| fixtures::event::message_sent(event, message))); - } -} - -// This test verifies a real bug I hit developing the vector-of-sequences -// approach to resuming events. A small omission caused the event IDs in a -// resumed stream to _omit_ channels that were in the original stream until -// those channels also appeared in the resumed stream. -// -// Clients would see something like -// * In the original stream, Cfoo=5,Cbar=8 -// * In the resumed stream, Cfoo=6 (no Cbar sequence number) -// -// Disconnecting and reconnecting a second time, using event IDs from that -// initial period of the first resume attempt, would then cause the second -// resume attempt to restart all other channels from the beginning, and not -// from where the first disconnection happened. -// -// This is a real and valid behaviour for clients! -#[tokio::test] -async fn serial_resume() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let sender = fixtures::login::create(&app, &fixtures::now()).await; - let channel_a = fixtures::channel::create(&app, &fixtures::now()).await; - let channel_b = fixtures::channel::create(&app, &fixtures::now()).await; - - // Call the endpoint - - let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - - let resume_at = { - let initial_messages = [ - fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await, - fixtures::message::send(&app, &channel_b, &sender, &fixtures::now()).await, - ]; - - // First subscription - let get::Response(events) = get::handler( - State(app.clone()), - subscriber.clone(), - None, - Query::default(), - ) - .await - .expect("subscribe never fails"); - - let events = events - .filter_map(fixtures::message::events) - .take(initial_messages.len()) - .collect::>() - .immediately() - .await; - - for message in &initial_messages { - assert!(events - .iter() - .any(|event| fixtures::event::message_sent(event, message))); - } - - let event = events.last().expect("this vec is non-empty"); - - event.sequence() - }; - - // Resume after disconnect - let resume_at = { - let resume_messages = [ - // Note that channel_b does not appear here. The buggy behaviour - // would be masked if channel_b happened to send a new message - // into the resumed event stream. - fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await, - fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await, - ]; - - // Second subscription - let get::Response(events) = get::handler( - State(app.clone()), - subscriber.clone(), - Some(resume_at.into()), - Query::default(), - ) - .await - .expect("subscribe never fails"); - - let events = events - .filter_map(fixtures::message::events) - .take(resume_messages.len()) - .collect::>() - .immediately() - .await; - - for message in &resume_messages { - assert!(events - .iter() - .any(|event| fixtures::event::message_sent(event, message))); - } - - let event = events.last().expect("this vec is non-empty"); - - event.sequence() - }; - - // Resume after disconnect a second time - { - // At this point, we can send on either channel and demonstrate the - // problem. The resume point should before both of these messages, but - // after _all_ prior messages. - let final_messages = [ - fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await, - fixtures::message::send(&app, &channel_b, &sender, &fixtures::now()).await, - ]; - - // Third subscription - let get::Response(events) = get::handler( - State(app.clone()), - subscriber.clone(), - Some(resume_at.into()), - Query::default(), - ) - .await - .expect("subscribe never fails"); - - let events = events - .filter_map(fixtures::message::events) - .take(final_messages.len()) - .collect::>() - .immediately() - .await; - - // This set of messages, in particular, _should not_ include any prior - // messages from `initial_messages` or `resume_messages`. - for message in &final_messages { - assert!(events - .iter() - .any(|event| fixtures::event::message_sent(event, message))); - } - }; -} - -#[tokio::test] -async fn terminates_on_token_expiry() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; - let sender = fixtures::login::create(&app, &fixtures::now()).await; - - // Subscribe via the endpoint - - let subscriber_creds = fixtures::login::create_with_password(&app, &fixtures::now()).await; - let subscriber = - fixtures::identity::logged_in(&app, &subscriber_creds, &fixtures::ancient()).await; - - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); - - // Verify the resulting stream's behaviour - - app.tokens() - .expire(&fixtures::now()) - .await - .expect("expiring tokens succeeds"); - - // These should not be delivered. - let messages = [ - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, - ]; - - assert!(events - .filter_map(fixtures::message::events) - .filter(|event| future::ready( - messages - .iter() - .any(|message| fixtures::event::message_sent(event, message)) - )) - .next() - .immediately() - .await - .is_none()); -} - -#[tokio::test] -async fn terminates_on_logout() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; - let sender = fixtures::login::create(&app, &fixtures::now()).await; - - // Subscribe via the endpoint - - let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - - let get::Response(events) = get::handler( - State(app.clone()), - subscriber.clone(), - None, - Query::default(), - ) - .await - .expect("subscribe never fails"); - - // Verify the resulting stream's behaviour - - app.tokens() - .logout(&subscriber.token) - .await - .expect("expiring tokens succeeds"); - - // These should not be delivered. - let messages = [ - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, - ]; - - assert!(events - .filter_map(fixtures::message::events) - .filter(|event| future::ready( - messages - .iter() - .any(|message| fixtures::event::message_sent(event, message)) - )) - .next() - .immediately() - .await - .is_none()); -} diff --git a/src/event/routes/test/channel.rs b/src/event/routes/test/channel.rs new file mode 100644 index 0000000..ac45bfc --- /dev/null +++ b/src/event/routes/test/channel.rs @@ -0,0 +1,212 @@ +use axum::extract::State; +use axum_extra::extract::Query; +use futures::{future, stream::StreamExt as _}; + +use crate::{ + event::routes::get, + test::fixtures::{self, future::Immediately as _}, +}; + +#[tokio::test] +async fn creating() { + // Set up the environment + + let app = fixtures::scratch_app().await; + + // Subscribe + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let get::Response(events) = + get::handler(State(app.clone()), subscriber, None, Query::default()) + .await + .expect("subscribe never fails"); + + // Create a channel + + let name = fixtures::channel::propose(); + let channel = app + .channels() + .create(&name, &fixtures::now()) + .await + .expect("creating a channel succeeds"); + + // Verify channel created event + + let _ = events + .filter_map(fixtures::channel::events) + .filter_map(fixtures::channel::created) + .filter(|event| future::ready(event.channel == channel)) + .next() + .immediately() + .await + .expect("channel created event is delivered"); +} + +#[tokio::test] +async fn previously_created() { + // Set up the environment + + let app = fixtures::scratch_app().await; + + // Create a channel + + let name = fixtures::channel::propose(); + let channel = app + .channels() + .create(&name, &fixtures::now()) + .await + .expect("creating a channel succeeds"); + + // Subscribe + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let get::Response(events) = + get::handler(State(app.clone()), subscriber, None, Query::default()) + .await + .expect("subscribe never fails"); + + // Verify channel created event + + let _ = events + .filter_map(fixtures::channel::events) + .filter_map(fixtures::channel::created) + .filter(|event| future::ready(event.channel == channel)) + .next() + .immediately() + .await + .expect("channel created event is delivered"); +} + +#[tokio::test] +async fn expiring() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; + + // Subscribe + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let get::Response(events) = + get::handler(State(app.clone()), subscriber, None, Query::default()) + .await + .expect("subscribe never fails"); + + // Expire channels + + app.channels() + .expire(&fixtures::now()) + .await + .expect("expiring channels always succeeds"); + + // Check for expiry event + let _ = events + .filter_map(fixtures::channel::events) + .filter_map(fixtures::channel::deleted) + .filter(|event| future::ready(event.id == channel.id)) + .next() + .immediately() + .await + .expect("a deleted channel event will be delivered"); +} + +#[tokio::test] +async fn previously_expired() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; + + // Expire channels + + app.channels() + .expire(&fixtures::now()) + .await + .expect("expiring channels always succeeds"); + + // Subscribe + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let get::Response(events) = + get::handler(State(app.clone()), subscriber, None, Query::default()) + .await + .expect("subscribe never fails"); + + // Check for expiry event + let _ = events + .filter_map(fixtures::channel::events) + .filter_map(fixtures::channel::deleted) + .filter(|event| future::ready(event.id == channel.id)) + .next() + .immediately() + .await + .expect("a deleted channel event will be delivered"); +} + +#[tokio::test] +async fn deleting() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; + + // Subscribe + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let get::Response(events) = + get::handler(State(app.clone()), subscriber, None, Query::default()) + .await + .expect("subscribe never fails"); + + // Delete the channel + + app.channels() + .delete(&channel.id, &fixtures::now()) + .await + .expect("deleting a valid channel succeeds"); + + // Check for delete event + let _ = events + .filter_map(fixtures::channel::events) + .filter_map(fixtures::channel::deleted) + .filter(|event| future::ready(event.id == channel.id)) + .next() + .immediately() + .await + .expect("a deleted channel event will be delivered"); +} + +#[tokio::test] +async fn previously_deleted() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let sender = fixtures::login::create(&app, &fixtures::now()).await; + let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; + + // Delete the channel + + app.channels() + .delete(&channel.id, &fixtures::now()) + .await + .expect("deleting a valid channel succeeds"); + + // Subscribe + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let get::Response(events) = + get::handler(State(app.clone()), subscriber, None, Query::default()) + .await + .expect("subscribe never fails"); + + // Check for expiry event + let _ = events + .filter_map(fixtures::message::events) + .filter_map(fixtures::message::deleted) + .filter(|event| future::ready(event.id == message.id)) + .next() + .immediately() + .await + .expect("a deleted message will be delivered"); +} diff --git a/src/event/routes/test/invite.rs b/src/event/routes/test/invite.rs new file mode 100644 index 0000000..10e4521 --- /dev/null +++ b/src/event/routes/test/invite.rs @@ -0,0 +1,82 @@ +use axum::extract::State; +use axum_extra::extract::Query; +use futures::{future, stream::StreamExt as _}; + +use crate::{ + event::routes::get, + test::fixtures::{self, future::Immediately as _}, +}; + +#[tokio::test] +async fn accepting_invite() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let issuer = fixtures::login::create(&app, &fixtures::now()).await; + let invite = fixtures::invite::issue(&app, &issuer, &fixtures::now()).await; + + // Subscribe + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let get::Response(events) = + get::handler(State(app.clone()), subscriber, None, Query::default()) + .await + .expect("subscribe never fails"); + + // Accept the invite + + let (name, password) = fixtures::login::propose(); + let (joiner, _) = app + .invites() + .accept(&invite.id, &name, &password, &fixtures::now()) + .await + .expect("accepting an invite succeeds"); + + // Expect a login created event + + let _ = events + .filter_map(fixtures::login::events) + .filter_map(fixtures::login::created) + .filter(|event| future::ready(event.login == joiner)) + .next() + .immediately() + .await + .expect("a login created event is sent"); +} + +#[tokio::test] +async fn previously_accepted_invite() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let issuer = fixtures::login::create(&app, &fixtures::now()).await; + let invite = fixtures::invite::issue(&app, &issuer, &fixtures::now()).await; + + // Accept the invite + + let (name, password) = fixtures::login::propose(); + let (joiner, _) = app + .invites() + .accept(&invite.id, &name, &password, &fixtures::now()) + .await + .expect("accepting an invite succeeds"); + + // Subscribe + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let get::Response(events) = + get::handler(State(app.clone()), subscriber, None, Query::default()) + .await + .expect("subscribe never fails"); + + // Expect a login created event + + let _ = events + .filter_map(fixtures::login::events) + .filter_map(fixtures::login::created) + .filter(|event| future::ready(event.login == joiner)) + .next() + .immediately() + .await + .expect("a login created event is sent"); +} diff --git a/src/event/routes/test/message.rs b/src/event/routes/test/message.rs new file mode 100644 index 0000000..9bbbc7d --- /dev/null +++ b/src/event/routes/test/message.rs @@ -0,0 +1,316 @@ +use axum::extract::State; +use axum_extra::extract::Query; +use futures::{ + future, + stream::{self, StreamExt as _}, +}; + +use crate::{ + event::routes::get, + test::fixtures::{self, future::Immediately as _}, +}; + +#[tokio::test] +async fn sending() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; + + // Call the endpoint + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let get::Response(events) = + get::handler(State(app.clone()), subscriber, None, Query::default()) + .await + .expect("subscribe never fails"); + + // Send a message + + let sender = fixtures::login::create(&app, &fixtures::now()).await; + let message = app + .messages() + .send( + &channel.id, + &sender, + &fixtures::now(), + &fixtures::message::propose(), + ) + .await + .expect("sending a message succeeds"); + + // Verify that an event is delivered + + let _ = events + .filter_map(fixtures::message::events) + .filter_map(fixtures::message::sent) + .filter(|event| future::ready(event.message == message)) + .next() + .immediately() + .await + .expect("delivered message sent event"); +} + +#[tokio::test] +async fn previously_sent() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; + + // Send a message + + let sender = fixtures::login::create(&app, &fixtures::now()).await; + let message = app + .messages() + .send( + &channel.id, + &sender, + &fixtures::now(), + &fixtures::message::propose(), + ) + .await + .expect("sending a message succeeds"); + + // Call the endpoint + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let get::Response(events) = + get::handler(State(app.clone()), subscriber, None, Query::default()) + .await + .expect("subscribe never fails"); + + // Verify that an event is delivered + + let _ = events + .filter_map(fixtures::message::events) + .filter_map(fixtures::message::sent) + .filter(|event| future::ready(event.message == message)) + .next() + .immediately() + .await + .expect("delivered message sent event"); +} + +#[tokio::test] +async fn sent_in_multiple_channels() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let sender = fixtures::login::create(&app, &fixtures::now()).await; + + let channels = [ + fixtures::channel::create(&app, &fixtures::now()).await, + fixtures::channel::create(&app, &fixtures::now()).await, + ]; + + let messages = stream::iter(channels) + .then(|channel| { + let app = app.clone(); + let sender = sender.clone(); + let channel = channel.clone(); + async move { fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await } + }) + .collect::>() + .await; + + // Call the endpoint + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let get::Response(events) = get::handler(State(app), subscriber, None, Query::default()) + .await + .expect("subscribe never fails"); + + // Verify the structure of the response. + + let events = events + .filter_map(fixtures::message::events) + .filter_map(fixtures::message::sent) + .take(messages.len()) + .collect::>() + .immediately() + .await; + + for message in &messages { + assert!(events.iter().any(|event| &event.message == message)); + } +} + +#[tokio::test] +async fn sent_sequentially() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let sender = fixtures::login::create(&app, &fixtures::now()).await; + + let messages = vec![ + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + ]; + + // Subscribe + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let get::Response(events) = get::handler(State(app), subscriber, None, Query::default()) + .await + .expect("subscribe never fails"); + + // Verify the expected events in the expected order + + let mut events = events + .filter_map(fixtures::message::events) + .filter_map(fixtures::message::sent) + .filter(|event| future::ready(messages.iter().any(|message| &event.message == message))); + + for message in &messages { + let event = events + .next() + .immediately() + .await + .expect("undelivered messages remaining"); + + assert_eq!(message, &event.message); + } +} + +#[tokio::test] +async fn expiring() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; + let sender = fixtures::login::create(&app, &fixtures::ancient()).await; + let message = fixtures::message::send(&app, &channel, &sender, &fixtures::ancient()).await; + + // Subscribe + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let get::Response(events) = + get::handler(State(app.clone()), subscriber, None, Query::default()) + .await + .expect("subscribe never fails"); + + // Expire messages + + app.messages() + .expire(&fixtures::now()) + .await + .expect("expiring messages always succeeds"); + + // Check for expiry event + let _ = events + .filter_map(fixtures::message::events) + .filter_map(fixtures::message::deleted) + .filter(|event| future::ready(event.id == message.id)) + .next() + .immediately() + .await + .expect("a deleted message event will be delivered"); +} + +#[tokio::test] +async fn previously_expired() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; + let sender = fixtures::login::create(&app, &fixtures::ancient()).await; + let message = fixtures::message::send(&app, &channel, &sender, &fixtures::ancient()).await; + + // Expire messages + + app.messages() + .expire(&fixtures::now()) + .await + .expect("expiring messages always succeeds"); + + // Subscribe + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let get::Response(events) = + get::handler(State(app.clone()), subscriber, None, Query::default()) + .await + .expect("subscribe never fails"); + + // Check for expiry event + let _ = events + .filter_map(fixtures::message::events) + .filter_map(fixtures::message::deleted) + .filter(|event| future::ready(event.id == message.id)) + .next() + .immediately() + .await + .expect("a deleted message event will be delivered"); +} + +#[tokio::test] +async fn deleting() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let sender = fixtures::login::create(&app, &fixtures::now()).await; + let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; + + // Subscribe + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let get::Response(events) = + get::handler(State(app.clone()), subscriber, None, Query::default()) + .await + .expect("subscribe never fails"); + + // Delete the message + + app.messages() + .delete(&message.id, &fixtures::now()) + .await + .expect("deleting a valid message succeeds"); + + // Check for delete event + let _ = events + .filter_map(fixtures::message::events) + .filter_map(fixtures::message::deleted) + .filter(|event| future::ready(event.id == message.id)) + .next() + .immediately() + .await + .expect("a deleted message event will be delivered"); +} + +#[tokio::test] +async fn previously_deleted() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let sender = fixtures::login::create(&app, &fixtures::now()).await; + let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; + + // Delete the message + + app.messages() + .delete(&message.id, &fixtures::now()) + .await + .expect("deleting a valid message succeeds"); + + // Subscribe + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let get::Response(events) = + get::handler(State(app.clone()), subscriber, None, Query::default()) + .await + .expect("subscribe never fails"); + + // Check for delete event + let _ = events + .filter_map(fixtures::message::events) + .filter_map(fixtures::message::deleted) + .filter(|event| future::ready(event.id == message.id)) + .next() + .immediately() + .await + .expect("a deleted message event will be delivered"); +} diff --git a/src/event/routes/test/mod.rs b/src/event/routes/test/mod.rs new file mode 100644 index 0000000..e7e35f1 --- /dev/null +++ b/src/event/routes/test/mod.rs @@ -0,0 +1,6 @@ +mod channel; +mod invite; +mod message; +mod resume; +mod setup; +mod token; diff --git a/src/event/routes/test/resume.rs b/src/event/routes/test/resume.rs new file mode 100644 index 0000000..c393d38 --- /dev/null +++ b/src/event/routes/test/resume.rs @@ -0,0 +1,220 @@ +use std::future; + +use axum::extract::State; +use axum_extra::extract::Query; +use futures::stream::{self, StreamExt as _}; + +use crate::{ + event::{routes::get, Sequenced as _}, + test::fixtures::{self, future::Immediately as _}, +}; + +#[tokio::test] +async fn resume() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let sender = fixtures::login::create(&app, &fixtures::now()).await; + + let initial_message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; + + let later_messages = vec![ + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + ]; + + // Call the endpoint + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + + let resume_at = { + // First subscription + let get::Response(events) = get::handler( + State(app.clone()), + subscriber.clone(), + None, + Query::default(), + ) + .await + .expect("subscribe never fails"); + + let event = events + .filter_map(fixtures::message::events) + .filter_map(fixtures::message::sent) + .filter(|event| future::ready(event.message == initial_message)) + .next() + .immediately() + .await + .expect("delivered events"); + + event.sequence() + }; + + // Resume after disconnect + let get::Response(resumed) = get::handler( + State(app), + subscriber, + Some(resume_at.into()), + Query::default(), + ) + .await + .expect("subscribe never fails"); + + // Verify final events + + let mut events = resumed + .filter_map(fixtures::message::events) + .filter_map(fixtures::message::sent) + .zip(stream::iter(later_messages)); + + while let Some((event, message)) = events.next().immediately().await { + assert_eq!(message, event.message); + } +} + +// This test verifies a real bug I hit developing the vector-of-sequences +// approach to resuming events. A small omission caused the event IDs in a +// resumed stream to _omit_ channels that were in the original stream until +// those channels also appeared in the resumed stream. +// +// Clients would see something like +// * In the original stream, Cfoo=5,Cbar=8 +// * In the resumed stream, Cfoo=6 (no Cbar sequence number) +// +// Disconnecting and reconnecting a second time, using event IDs from that +// initial period of the first resume attempt, would then cause the second +// resume attempt to restart all other channels from the beginning, and not +// from where the first disconnection happened. +// +// As we have switched to a single global event sequence number, this scenario +// can no longer arise, but this test is preserved because the actual behaviour +// _is_ a valid way for clients to behave, and should work. We might as well +// keep testing it. +#[tokio::test] +async fn serial_resume() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let sender = fixtures::login::create(&app, &fixtures::now()).await; + let channel_a = fixtures::channel::create(&app, &fixtures::now()).await; + let channel_b = fixtures::channel::create(&app, &fixtures::now()).await; + + // Call the endpoint + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + + let resume_at = { + let initial_messages = [ + fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel_b, &sender, &fixtures::now()).await, + ]; + + // First subscription + + let get::Response(events) = get::handler( + State(app.clone()), + subscriber.clone(), + None, + Query::default(), + ) + .await + .expect("subscribe never fails"); + + // Check for expected events + + let events = events + .filter_map(fixtures::message::events) + .filter_map(fixtures::message::sent) + .zip(stream::iter(initial_messages)) + .collect::>() + .immediately() + .await; + + assert!(events + .iter() + .all(|(event, message)| message == &event.message)); + + let (event, _) = events.last().expect("this vec is non-empty"); + + // Take the last one's resume point + + event.sequence() + }; + + // Resume after disconnect + let resume_at = { + let resume_messages = [ + // Note that channel_b does not appear here. The buggy behaviour + // would be masked if channel_b happened to send a new message + // into the resumed event stream. + fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await, + ]; + + // Second subscription + let get::Response(events) = get::handler( + State(app.clone()), + subscriber.clone(), + Some(resume_at.into()), + Query::default(), + ) + .await + .expect("subscribe never fails"); + + // Check for expected events + + let events = events + .filter_map(fixtures::message::events) + .filter_map(fixtures::message::sent) + .zip(stream::iter(resume_messages)) + .collect::>() + .immediately() + .await; + + assert!(events + .iter() + .all(|(event, message)| message == &event.message)); + + let (event, _) = events.last().expect("this vec is non-empty"); + + // Take the last one's resume point + + event.sequence() + }; + + // Resume after disconnect a second time + { + // At this point, we can send on either channel and demonstrate the + // problem. The resume point should before both of these messages, but + // after _all_ prior messages. + let final_messages = [ + fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel_b, &sender, &fixtures::now()).await, + ]; + + // Third subscription + let get::Response(events) = get::handler( + State(app.clone()), + subscriber.clone(), + Some(resume_at.into()), + Query::default(), + ) + .await + .expect("subscribe never fails"); + + // Check for expected events + + let events = events + .filter_map(fixtures::message::events) + .filter_map(fixtures::message::sent) + .zip(stream::iter(final_messages)) + .collect::>() + .immediately() + .await; + + assert!(events + .iter() + .all(|(event, message)| message == &event.message)); + }; +} diff --git a/src/event/routes/test/setup.rs b/src/event/routes/test/setup.rs new file mode 100644 index 0000000..234c2d9 --- /dev/null +++ b/src/event/routes/test/setup.rs @@ -0,0 +1,46 @@ +use axum::extract::State; +use axum_extra::extract::Query; +use futures::{future, stream::StreamExt as _}; + +use crate::{ + event::routes::get, + test::fixtures::{self, future::Immediately as _}, +}; + +// There's no test for this in subscribe-then-setup order because creating an +// identity to subscribe with also completes initial setup, preventing the +// test from running. That is also a can't-happen scenario in reality. +#[tokio::test] +async fn previously_completed() { + // Set up the environment + + let app = fixtures::scratch_app().await; + + // Complete initial setup + + let (name, password) = fixtures::login::propose(); + let (owner, _) = app + .setup() + .initial(&name, &password, &fixtures::now()) + .await + .expect("initial setup in an empty app succeeds"); + + // Subscribe to events + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let get::Response(events) = + get::handler(State(app.clone()), subscriber, None, Query::default()) + .await + .expect("subscribe never fails"); + + // Expect a login created event + + let _ = events + .filter_map(fixtures::login::events) + .filter_map(fixtures::login::created) + .filter(|event| future::ready(event.login == owner)) + .next() + .immediately() + .await + .expect("a login created event is sent"); +} diff --git a/src/event/routes/test/token.rs b/src/event/routes/test/token.rs new file mode 100644 index 0000000..a545988 --- /dev/null +++ b/src/event/routes/test/token.rs @@ -0,0 +1,97 @@ +use axum::extract::State; +use axum_extra::extract::Query; +use futures::{future, stream::StreamExt as _}; + +use crate::{ + event::routes::get, + test::fixtures::{self, future::Immediately as _}, +}; + +#[tokio::test] +async fn terminates_on_token_expiry() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let sender = fixtures::login::create(&app, &fixtures::now()).await; + + // Subscribe via the endpoint + + let subscriber_creds = fixtures::login::create_with_password(&app, &fixtures::now()).await; + let subscriber = + fixtures::identity::logged_in(&app, &subscriber_creds, &fixtures::ancient()).await; + + let get::Response(events) = + get::handler(State(app.clone()), subscriber, None, Query::default()) + .await + .expect("subscribe never fails"); + + // Verify the resulting stream's behaviour + + app.tokens() + .expire(&fixtures::now()) + .await + .expect("expiring tokens succeeds"); + + // These should not be delivered. + let messages = [ + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + ]; + + assert!(events + .filter_map(fixtures::message::events) + .filter_map(fixtures::message::sent) + .filter(|event| future::ready(messages.iter().any(|message| &event.message == message))) + .next() + .immediately() + .await + .is_none()); +} + +#[tokio::test] +async fn terminates_on_logout() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let sender = fixtures::login::create(&app, &fixtures::now()).await; + + // Subscribe via the endpoint + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + + let get::Response(events) = get::handler( + State(app.clone()), + subscriber.clone(), + None, + Query::default(), + ) + .await + .expect("subscribe never fails"); + + // Verify the resulting stream's behaviour + + app.tokens() + .logout(&subscriber.token) + .await + .expect("expiring tokens succeeds"); + + // These should not be delivered. + + let messages = [ + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + ]; + + assert!(events + .filter_map(fixtures::message::events) + .filter_map(fixtures::message::sent) + .filter(|event| future::ready(messages.iter().any(|message| &event.message == message))) + .next() + .immediately() + .await + .is_none()); +} diff --git a/src/invite/app.rs b/src/invite/app.rs index 65e7721..176075f 100644 --- a/src/invite/app.rs +++ b/src/invite/app.rs @@ -5,7 +5,7 @@ use super::{repo::Provider as _, Id, Invite, Summary}; use crate::{ clock::DateTime, db::{Duplicate as _, NotFound as _}, - event::repo::Provider as _, + event::{repo::Provider as _, Broadcaster, Event}, login::{repo::Provider as _, Login, Password}, name::Name, token::{repo::Provider as _, Secret}, @@ -13,11 +13,12 @@ use crate::{ pub struct Invites<'a> { db: &'a SqlitePool, + events: &'a Broadcaster, } impl<'a> Invites<'a> { - pub const fn new(db: &'a SqlitePool) -> Self { - Self { db } + pub const fn new(db: &'a SqlitePool, events: &'a Broadcaster) -> Self { + Self { db, events } } pub async fn issue(&self, issuer: &Login, issued_at: &DateTime) -> Result { @@ -69,6 +70,9 @@ impl<'a> Invites<'a> { let secret = tx.tokens().issue(&login, accepted_at).await?; tx.commit().await?; + self.events + .broadcast(login.events().map(Event::from).collect::>()); + Ok((login.as_created(), secret)) } diff --git a/src/test/fixtures/channel.rs b/src/test/fixtures/channel.rs index 8cb38ae..1fd8d23 100644 --- a/src/test/fixtures/channel.rs +++ b/src/test/fixtures/channel.rs @@ -45,6 +45,13 @@ pub fn created(event: channel::Event) -> future::Ready future::Ready> { + future::ready(match event { + channel::Event::Deleted(event) => Some(event), + channel::Event::Created(_) => None, + }) +} + pub fn fictitious() -> channel::Id { channel::Id::generate() } diff --git a/src/test/fixtures/event.rs b/src/test/fixtures/event.rs deleted file mode 100644 index fa4fbc0..0000000 --- a/src/test/fixtures/event.rs +++ /dev/null @@ -1,8 +0,0 @@ -use crate::message::{Event, Message}; - -pub fn message_sent(event: &Event, message: &Message) -> bool { - matches!( - &event, - Event::Sent(event) if message == &event.into() - ) -} diff --git a/src/test/fixtures/login.rs b/src/test/fixtures/login.rs index e308289..cbcbdd4 100644 --- a/src/test/fixtures/login.rs +++ b/src/test/fixtures/login.rs @@ -1,9 +1,12 @@ +use std::future::{self, Ready}; + use faker_rand::en_us::internet; use uuid::Uuid; use crate::{ app::App, clock::RequestedAt, + event::Event, login::{self, Login, Password}, name::Name, }; @@ -45,3 +48,16 @@ fn propose_name() -> Name { pub fn propose_password() -> Password { Uuid::new_v4().to_string().into() } + +pub fn events(event: Event) -> Ready> { + future::ready(match event { + Event::Login(event) => Some(event), + _ => None, + }) +} + +pub fn created(event: login::Event) -> Ready> { + future::ready(match event { + login::Event::Created(event) => Some(event), + }) +} diff --git a/src/test/fixtures/message.rs b/src/test/fixtures/message.rs index 3aebdd9..8cb50c1 100644 --- a/src/test/fixtures/message.rs +++ b/src/test/fixtures/message.rs @@ -8,7 +8,7 @@ use crate::{ clock::RequestedAt, event::Event, login::Login, - message::{self, Body, Message}, + message::{self, event, Body, Message}, }; pub async fn send(app: &App, channel: &Channel, sender: &Login, sent_at: &RequestedAt) -> Message { @@ -31,6 +31,20 @@ pub fn events(event: Event) -> future::Ready> { }) } +pub fn sent(event: message::Event) -> future::Ready> { + future::ready(match event { + message::Event::Sent(event) => Some(event), + message::Event::Deleted(_) => None, + }) +} + +pub fn deleted(event: message::Event) -> future::Ready> { + future::ready(match event { + message::Event::Deleted(event) => Some(event), + message::Event::Sent(_) => None, + }) +} + pub fn fictitious() -> message::Id { message::Id::generate() } diff --git a/src/test/fixtures/mod.rs b/src/test/fixtures/mod.rs index 2b7b6af..cf30e02 100644 --- a/src/test/fixtures/mod.rs +++ b/src/test/fixtures/mod.rs @@ -4,7 +4,6 @@ use crate::{app::App, clock::RequestedAt, db}; pub mod channel; pub mod cookie; -pub mod event; pub mod future; pub mod identity; pub mod invite; -- cgit v1.2.3 From eae0edb57e9ade7c73affb78baf2ae267b6290b8 Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Thu, 24 Oct 2024 22:50:23 -0400 Subject: Consolidate test helper event functions --- src/channel/routes/channel/test/post.rs | 2 +- src/channel/routes/test.rs | 4 +- src/event/routes/test/channel.rs | 30 ++++++------- src/event/routes/test/invite.rs | 8 ++-- src/event/routes/test/message.rs | 32 ++++++------- src/event/routes/test/resume.rs | 20 ++++----- src/event/routes/test/setup.rs | 4 +- src/event/routes/test/token.rs | 8 ++-- src/test/fixtures/channel.rs | 24 ---------- src/test/fixtures/event.rs | 79 +++++++++++++++++++++++++++++++++ src/test/fixtures/login.rs | 16 ------- src/test/fixtures/message.rs | 26 +---------- src/test/fixtures/mod.rs | 1 + 13 files changed, 134 insertions(+), 120 deletions(-) create mode 100644 src/test/fixtures/event.rs (limited to 'src/test/fixtures/mod.rs') diff --git a/src/channel/routes/channel/test/post.rs b/src/channel/routes/channel/test/post.rs index 67e7d36..d81715f 100644 --- a/src/channel/routes/channel/test/post.rs +++ b/src/channel/routes/channel/test/post.rs @@ -44,7 +44,7 @@ async fn messages_in_order() { .subscribe(None) .await .expect("subscribing to a valid channel succeeds") - .filter_map(fixtures::message::events) + .filter_map(fixtures::event::message) .take(requests.len()); let events = events.collect::>().immediately().await; diff --git a/src/channel/routes/test.rs b/src/channel/routes/test.rs index 216eba1..46c58b0 100644 --- a/src/channel/routes/test.rs +++ b/src/channel/routes/test.rs @@ -48,8 +48,8 @@ async fn new_channel() { .subscribe(None) .await .expect("subscribing never fails") - .filter_map(fixtures::channel::events) - .filter_map(fixtures::channel::created) + .filter_map(fixtures::event::channel) + .filter_map(fixtures::event::channel::created) .filter(|event| future::ready(event.channel == response)); let event = events diff --git a/src/event/routes/test/channel.rs b/src/event/routes/test/channel.rs index ac45bfc..0ab28c4 100644 --- a/src/event/routes/test/channel.rs +++ b/src/event/routes/test/channel.rs @@ -33,8 +33,8 @@ async fn creating() { // Verify channel created event let _ = events - .filter_map(fixtures::channel::events) - .filter_map(fixtures::channel::created) + .filter_map(fixtures::event::channel) + .filter_map(fixtures::event::channel::created) .filter(|event| future::ready(event.channel == channel)) .next() .immediately() @@ -68,8 +68,8 @@ async fn previously_created() { // Verify channel created event let _ = events - .filter_map(fixtures::channel::events) - .filter_map(fixtures::channel::created) + .filter_map(fixtures::event::channel) + .filter_map(fixtures::event::channel::created) .filter(|event| future::ready(event.channel == channel)) .next() .immediately() @@ -101,8 +101,8 @@ async fn expiring() { // Check for expiry event let _ = events - .filter_map(fixtures::channel::events) - .filter_map(fixtures::channel::deleted) + .filter_map(fixtures::event::channel) + .filter_map(fixtures::event::channel::deleted) .filter(|event| future::ready(event.id == channel.id)) .next() .immediately() @@ -134,8 +134,8 @@ async fn previously_expired() { // Check for expiry event let _ = events - .filter_map(fixtures::channel::events) - .filter_map(fixtures::channel::deleted) + .filter_map(fixtures::event::channel) + .filter_map(fixtures::event::channel::deleted) .filter(|event| future::ready(event.id == channel.id)) .next() .immediately() @@ -167,8 +167,8 @@ async fn deleting() { // Check for delete event let _ = events - .filter_map(fixtures::channel::events) - .filter_map(fixtures::channel::deleted) + .filter_map(fixtures::event::channel) + .filter_map(fixtures::event::channel::deleted) .filter(|event| future::ready(event.id == channel.id)) .next() .immediately() @@ -182,8 +182,6 @@ async fn previously_deleted() { let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; - let sender = fixtures::login::create(&app, &fixtures::now()).await; - let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; // Delete the channel @@ -202,11 +200,11 @@ async fn previously_deleted() { // Check for expiry event let _ = events - .filter_map(fixtures::message::events) - .filter_map(fixtures::message::deleted) - .filter(|event| future::ready(event.id == message.id)) + .filter_map(fixtures::event::channel) + .filter_map(fixtures::event::channel::deleted) + .filter(|event| future::ready(event.id == channel.id)) .next() .immediately() .await - .expect("a deleted message will be delivered"); + .expect("a deleted channel event will be delivered"); } diff --git a/src/event/routes/test/invite.rs b/src/event/routes/test/invite.rs index 10e4521..afd3aeb 100644 --- a/src/event/routes/test/invite.rs +++ b/src/event/routes/test/invite.rs @@ -35,8 +35,8 @@ async fn accepting_invite() { // Expect a login created event let _ = events - .filter_map(fixtures::login::events) - .filter_map(fixtures::login::created) + .filter_map(fixtures::event::login) + .filter_map(fixtures::event::login::created) .filter(|event| future::ready(event.login == joiner)) .next() .immediately() @@ -72,8 +72,8 @@ async fn previously_accepted_invite() { // Expect a login created event let _ = events - .filter_map(fixtures::login::events) - .filter_map(fixtures::login::created) + .filter_map(fixtures::event::login) + .filter_map(fixtures::event::login::created) .filter(|event| future::ready(event.login == joiner)) .next() .immediately() diff --git a/src/event/routes/test/message.rs b/src/event/routes/test/message.rs index 9bbbc7d..df42a89 100644 --- a/src/event/routes/test/message.rs +++ b/src/event/routes/test/message.rs @@ -42,8 +42,8 @@ async fn sending() { // Verify that an event is delivered let _ = events - .filter_map(fixtures::message::events) - .filter_map(fixtures::message::sent) + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::sent) .filter(|event| future::ready(event.message == message)) .next() .immediately() @@ -83,8 +83,8 @@ async fn previously_sent() { // Verify that an event is delivered let _ = events - .filter_map(fixtures::message::events) - .filter_map(fixtures::message::sent) + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::sent) .filter(|event| future::ready(event.message == message)) .next() .immediately() @@ -124,8 +124,8 @@ async fn sent_in_multiple_channels() { // Verify the structure of the response. let events = events - .filter_map(fixtures::message::events) - .filter_map(fixtures::message::sent) + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::sent) .take(messages.len()) .collect::>() .immediately() @@ -160,8 +160,8 @@ async fn sent_sequentially() { // Verify the expected events in the expected order let mut events = events - .filter_map(fixtures::message::events) - .filter_map(fixtures::message::sent) + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::sent) .filter(|event| future::ready(messages.iter().any(|message| &event.message == message))); for message in &messages { @@ -201,8 +201,8 @@ async fn expiring() { // Check for expiry event let _ = events - .filter_map(fixtures::message::events) - .filter_map(fixtures::message::deleted) + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::deleted) .filter(|event| future::ready(event.id == message.id)) .next() .immediately() @@ -236,8 +236,8 @@ async fn previously_expired() { // Check for expiry event let _ = events - .filter_map(fixtures::message::events) - .filter_map(fixtures::message::deleted) + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::deleted) .filter(|event| future::ready(event.id == message.id)) .next() .immediately() @@ -271,8 +271,8 @@ async fn deleting() { // Check for delete event let _ = events - .filter_map(fixtures::message::events) - .filter_map(fixtures::message::deleted) + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::deleted) .filter(|event| future::ready(event.id == message.id)) .next() .immediately() @@ -306,8 +306,8 @@ async fn previously_deleted() { // Check for delete event let _ = events - .filter_map(fixtures::message::events) - .filter_map(fixtures::message::deleted) + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::deleted) .filter(|event| future::ready(event.id == message.id)) .next() .immediately() diff --git a/src/event/routes/test/resume.rs b/src/event/routes/test/resume.rs index c393d38..e4751bb 100644 --- a/src/event/routes/test/resume.rs +++ b/src/event/routes/test/resume.rs @@ -40,8 +40,8 @@ async fn resume() { .expect("subscribe never fails"); let event = events - .filter_map(fixtures::message::events) - .filter_map(fixtures::message::sent) + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::sent) .filter(|event| future::ready(event.message == initial_message)) .next() .immediately() @@ -64,8 +64,8 @@ async fn resume() { // Verify final events let mut events = resumed - .filter_map(fixtures::message::events) - .filter_map(fixtures::message::sent) + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::sent) .zip(stream::iter(later_messages)); while let Some((event, message)) = events.next().immediately().await { @@ -124,8 +124,8 @@ async fn serial_resume() { // Check for expected events let events = events - .filter_map(fixtures::message::events) - .filter_map(fixtures::message::sent) + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::sent) .zip(stream::iter(initial_messages)) .collect::>() .immediately() @@ -165,8 +165,8 @@ async fn serial_resume() { // Check for expected events let events = events - .filter_map(fixtures::message::events) - .filter_map(fixtures::message::sent) + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::sent) .zip(stream::iter(resume_messages)) .collect::>() .immediately() @@ -206,8 +206,8 @@ async fn serial_resume() { // Check for expected events let events = events - .filter_map(fixtures::message::events) - .filter_map(fixtures::message::sent) + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::sent) .zip(stream::iter(final_messages)) .collect::>() .immediately() diff --git a/src/event/routes/test/setup.rs b/src/event/routes/test/setup.rs index 234c2d9..a54b65b 100644 --- a/src/event/routes/test/setup.rs +++ b/src/event/routes/test/setup.rs @@ -36,8 +36,8 @@ async fn previously_completed() { // Expect a login created event let _ = events - .filter_map(fixtures::login::events) - .filter_map(fixtures::login::created) + .filter_map(fixtures::event::login) + .filter_map(fixtures::event::login::created) .filter(|event| future::ready(event.login == owner)) .next() .immediately() diff --git a/src/event/routes/test/token.rs b/src/event/routes/test/token.rs index a545988..577fabd 100644 --- a/src/event/routes/test/token.rs +++ b/src/event/routes/test/token.rs @@ -41,8 +41,8 @@ async fn terminates_on_token_expiry() { ]; assert!(events - .filter_map(fixtures::message::events) - .filter_map(fixtures::message::sent) + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::sent) .filter(|event| future::ready(messages.iter().any(|message| &event.message == message))) .next() .immediately() @@ -87,8 +87,8 @@ async fn terminates_on_logout() { ]; assert!(events - .filter_map(fixtures::message::events) - .filter_map(fixtures::message::sent) + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::sent) .filter(|event| future::ready(messages.iter().any(|message| &event.message == message))) .next() .immediately() diff --git a/src/test/fixtures/channel.rs b/src/test/fixtures/channel.rs index 1fd8d23..0c6480b 100644 --- a/src/test/fixtures/channel.rs +++ b/src/test/fixtures/channel.rs @@ -1,5 +1,3 @@ -use std::future; - use faker_rand::{ en_us::{addresses::CityName, names::FullName}, faker_impl_from_templates, @@ -10,7 +8,6 @@ use crate::{ app::App, channel::{self, Channel}, clock::RequestedAt, - event::Event, name::Name, }; @@ -31,27 +28,6 @@ faker_impl_from_templates! { NameTemplate; "{} {}", CityName, FullName; } -pub fn events(event: Event) -> future::Ready> { - future::ready(match event { - Event::Channel(channel) => Some(channel), - _ => None, - }) -} - -pub fn created(event: channel::Event) -> future::Ready> { - future::ready(match event { - channel::Event::Created(event) => Some(event), - channel::Event::Deleted(_) => None, - }) -} - -pub fn deleted(event: channel::Event) -> future::Ready> { - future::ready(match event { - channel::Event::Deleted(event) => Some(event), - channel::Event::Created(_) => None, - }) -} - pub fn fictitious() -> channel::Id { channel::Id::generate() } diff --git a/src/test/fixtures/event.rs b/src/test/fixtures/event.rs new file mode 100644 index 0000000..de02d4d --- /dev/null +++ b/src/test/fixtures/event.rs @@ -0,0 +1,79 @@ +use std::future::{self, Ready}; + +use crate::event::Event; + +pub fn channel(event: Event) -> Ready> { + future::ready(match event { + Event::Channel(channel) => Some(channel), + _ => None, + }) +} + +pub fn message(event: Event) -> Ready> { + future::ready(match event { + Event::Message(event) => Some(event), + _ => None, + }) +} + +pub fn login(event: Event) -> Ready> { + future::ready(match event { + Event::Login(event) => Some(event), + _ => None, + }) +} + +pub mod channel { + use std::future::{self, Ready}; + + use crate::channel::event; + pub use crate::channel::Event; + + pub fn created(event: Event) -> Ready> { + future::ready(match event { + Event::Created(event) => Some(event), + Event::Deleted(_) => None, + }) + } + + pub fn deleted(event: Event) -> Ready> { + future::ready(match event { + Event::Deleted(event) => Some(event), + Event::Created(_) => None, + }) + } +} + +pub mod message { + use std::future::{self, Ready}; + + use crate::message::event; + pub use crate::message::Event; + + pub fn sent(event: Event) -> Ready> { + future::ready(match event { + Event::Sent(event) => Some(event), + Event::Deleted(_) => None, + }) + } + + pub fn deleted(event: Event) -> future::Ready> { + future::ready(match event { + Event::Deleted(event) => Some(event), + Event::Sent(_) => None, + }) + } +} + +pub mod login { + use std::future::{self, Ready}; + + use crate::login::event; + pub use crate::login::Event; + + pub fn created(event: Event) -> Ready> { + future::ready(match event { + Event::Created(event) => Some(event), + }) + } +} diff --git a/src/test/fixtures/login.rs b/src/test/fixtures/login.rs index cbcbdd4..e308289 100644 --- a/src/test/fixtures/login.rs +++ b/src/test/fixtures/login.rs @@ -1,12 +1,9 @@ -use std::future::{self, Ready}; - use faker_rand::en_us::internet; use uuid::Uuid; use crate::{ app::App, clock::RequestedAt, - event::Event, login::{self, Login, Password}, name::Name, }; @@ -48,16 +45,3 @@ fn propose_name() -> Name { pub fn propose_password() -> Password { Uuid::new_v4().to_string().into() } - -pub fn events(event: Event) -> Ready> { - future::ready(match event { - Event::Login(event) => Some(event), - _ => None, - }) -} - -pub fn created(event: login::Event) -> Ready> { - future::ready(match event { - login::Event::Created(event) => Some(event), - }) -} diff --git a/src/test/fixtures/message.rs b/src/test/fixtures/message.rs index 8cb50c1..d3b4719 100644 --- a/src/test/fixtures/message.rs +++ b/src/test/fixtures/message.rs @@ -1,14 +1,11 @@ -use std::future; - use faker_rand::lorem::Paragraphs; use crate::{ app::App, channel::Channel, clock::RequestedAt, - event::Event, login::Login, - message::{self, event, Body, Message}, + message::{self, Body, Message}, }; pub async fn send(app: &App, channel: &Channel, sender: &Login, sent_at: &RequestedAt) -> Message { @@ -24,27 +21,6 @@ pub fn propose() -> Body { rand::random::().to_string().into() } -pub fn events(event: Event) -> future::Ready> { - future::ready(match event { - Event::Message(event) => Some(event), - _ => None, - }) -} - -pub fn sent(event: message::Event) -> future::Ready> { - future::ready(match event { - message::Event::Sent(event) => Some(event), - message::Event::Deleted(_) => None, - }) -} - -pub fn deleted(event: message::Event) -> future::Ready> { - future::ready(match event { - message::Event::Deleted(event) => Some(event), - message::Event::Sent(_) => None, - }) -} - pub fn fictitious() -> message::Id { message::Id::generate() } diff --git a/src/test/fixtures/mod.rs b/src/test/fixtures/mod.rs index cf30e02..2b7b6af 100644 --- a/src/test/fixtures/mod.rs +++ b/src/test/fixtures/mod.rs @@ -4,6 +4,7 @@ use crate::{app::App, clock::RequestedAt, db}; pub mod channel; pub mod cookie; +pub mod event; pub mod future; pub mod identity; pub mod invite; -- cgit v1.2.3