summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/app.rs2
-rw-r--r--src/event/routes/test.rs459
-rw-r--r--src/event/routes/test/channel.rs212
-rw-r--r--src/event/routes/test/invite.rs82
-rw-r--r--src/event/routes/test/message.rs316
-rw-r--r--src/event/routes/test/mod.rs6
-rw-r--r--src/event/routes/test/resume.rs220
-rw-r--r--src/event/routes/test/setup.rs46
-rw-r--r--src/event/routes/test/token.rs97
-rw-r--r--src/invite/app.rs10
-rw-r--r--src/test/fixtures/channel.rs7
-rw-r--r--src/test/fixtures/event.rs8
-rw-r--r--src/test/fixtures/login.rs16
-rw-r--r--src/test/fixtures/message.rs16
-rw-r--r--src/test/fixtures/mod.rs1
15 files changed, 1025 insertions, 473 deletions
diff --git a/src/app.rs b/src/app.rs
index 6d71259..bc1daa5 100644
--- a/src/app.rs
+++ b/src/app.rs
@@ -44,7 +44,7 @@ impl App {
}
pub const fn invites(&self) -> Invites {
- Invites::new(&self.db)
+ Invites::new(&self.db, &self.events)
}
#[cfg(not(test))]
diff --git a/src/event/routes/test.rs b/src/event/routes/test.rs
deleted file mode 100644
index 49f8094..0000000
--- a/src/event/routes/test.rs
+++ /dev/null
@@ -1,459 +0,0 @@
-use axum::extract::State;
-use axum_extra::extract::Query;
-use futures::{
- future,
- stream::{self, StreamExt as _},
-};
-
-use super::get;
-use crate::{
- event::Sequenced as _,
- test::fixtures::{self, future::Immediately as _},
-};
-
-#[tokio::test]
-async fn includes_historical_message() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let sender = fixtures::login::create(&app, &fixtures::now()).await;
- let channel = fixtures::channel::create(&app, &fixtures::now()).await;
- let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await;
-
- // Call the endpoint
-
- let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) = get::handler(State(app), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
-
- // Verify the structure of the response.
-
- let event = events
- .filter_map(fixtures::message::events)
- .next()
- .immediately()
- .await
- .expect("delivered stored message");
-
- assert!(fixtures::event::message_sent(&event, &message));
-}
-
-#[tokio::test]
-async fn includes_live_message() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let channel = fixtures::channel::create(&app, &fixtures::now()).await;
-
- // Call the endpoint
-
- let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) =
- get::handler(State(app.clone()), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
-
- // Verify the semantics
-
- let sender = fixtures::login::create(&app, &fixtures::now()).await;
- let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await;
-
- let event = events
- .filter_map(fixtures::message::events)
- .next()
- .immediately()
- .await
- .expect("delivered live message");
-
- assert!(fixtures::event::message_sent(&event, &message));
-}
-
-#[tokio::test]
-async fn includes_multiple_channels() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let sender = fixtures::login::create(&app, &fixtures::now()).await;
-
- let channels = [
- fixtures::channel::create(&app, &fixtures::now()).await,
- fixtures::channel::create(&app, &fixtures::now()).await,
- ];
-
- let messages = stream::iter(channels)
- .then(|channel| {
- let app = app.clone();
- let sender = sender.clone();
- let channel = channel.clone();
- async move { fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await }
- })
- .collect::<Vec<_>>()
- .await;
-
- // Call the endpoint
-
- let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) = get::handler(State(app), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
-
- // Verify the structure of the response.
-
- let events = events
- .filter_map(fixtures::message::events)
- .take(messages.len())
- .collect::<Vec<_>>()
- .immediately()
- .await;
-
- for message in &messages {
- assert!(events
- .iter()
- .any(|event| fixtures::event::message_sent(event, message)));
- }
-}
-
-#[tokio::test]
-async fn sequential_messages() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let channel = fixtures::channel::create(&app, &fixtures::now()).await;
- let sender = fixtures::login::create(&app, &fixtures::now()).await;
-
- let messages = vec![
- fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
- fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
- fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
- ];
-
- // Call the endpoint
-
- let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) = get::handler(State(app), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
-
- // Verify the structure of the response.
-
- let mut events = events
- .filter_map(fixtures::message::events)
- .filter(|event| {
- future::ready(
- messages
- .iter()
- .any(|message| fixtures::event::message_sent(event, message)),
- )
- });
-
- // Verify delivery in order
- for message in &messages {
- let event = events
- .next()
- .immediately()
- .await
- .expect("undelivered messages remaining");
-
- assert!(fixtures::event::message_sent(&event, message));
- }
-}
-
-#[tokio::test]
-async fn resumes_from() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let channel = fixtures::channel::create(&app, &fixtures::now()).await;
- let sender = fixtures::login::create(&app, &fixtures::now()).await;
-
- let initial_message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await;
-
- let later_messages = vec![
- fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
- fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
- ];
-
- // Call the endpoint
-
- let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
-
- let resume_at = {
- // First subscription
- let get::Response(events) = get::handler(
- State(app.clone()),
- subscriber.clone(),
- None,
- Query::default(),
- )
- .await
- .expect("subscribe never fails");
-
- let event = events
- .filter_map(fixtures::message::events)
- .next()
- .immediately()
- .await
- .expect("delivered events");
-
- assert!(fixtures::event::message_sent(&event, &initial_message));
-
- event.sequence()
- };
-
- // Resume after disconnect
- let get::Response(resumed) = get::handler(
- State(app),
- subscriber,
- Some(resume_at.into()),
- Query::default(),
- )
- .await
- .expect("subscribe never fails");
-
- // Verify the structure of the response.
-
- let events = resumed
- .filter_map(fixtures::message::events)
- .take(later_messages.len())
- .collect::<Vec<_>>()
- .immediately()
- .await;
-
- for message in &later_messages {
- assert!(events
- .iter()
- .any(|event| fixtures::event::message_sent(event, message)));
- }
-}
-
-// This test verifies a real bug I hit developing the vector-of-sequences
-// approach to resuming events. A small omission caused the event IDs in a
-// resumed stream to _omit_ channels that were in the original stream until
-// those channels also appeared in the resumed stream.
-//
-// Clients would see something like
-// * In the original stream, Cfoo=5,Cbar=8
-// * In the resumed stream, Cfoo=6 (no Cbar sequence number)
-//
-// Disconnecting and reconnecting a second time, using event IDs from that
-// initial period of the first resume attempt, would then cause the second
-// resume attempt to restart all other channels from the beginning, and not
-// from where the first disconnection happened.
-//
-// This is a real and valid behaviour for clients!
-#[tokio::test]
-async fn serial_resume() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let sender = fixtures::login::create(&app, &fixtures::now()).await;
- let channel_a = fixtures::channel::create(&app, &fixtures::now()).await;
- let channel_b = fixtures::channel::create(&app, &fixtures::now()).await;
-
- // Call the endpoint
-
- let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
-
- let resume_at = {
- let initial_messages = [
- fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await,
- fixtures::message::send(&app, &channel_b, &sender, &fixtures::now()).await,
- ];
-
- // First subscription
- let get::Response(events) = get::handler(
- State(app.clone()),
- subscriber.clone(),
- None,
- Query::default(),
- )
- .await
- .expect("subscribe never fails");
-
- let events = events
- .filter_map(fixtures::message::events)
- .take(initial_messages.len())
- .collect::<Vec<_>>()
- .immediately()
- .await;
-
- for message in &initial_messages {
- assert!(events
- .iter()
- .any(|event| fixtures::event::message_sent(event, message)));
- }
-
- let event = events.last().expect("this vec is non-empty");
-
- event.sequence()
- };
-
- // Resume after disconnect
- let resume_at = {
- let resume_messages = [
- // Note that channel_b does not appear here. The buggy behaviour
- // would be masked if channel_b happened to send a new message
- // into the resumed event stream.
- fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await,
- fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await,
- ];
-
- // Second subscription
- let get::Response(events) = get::handler(
- State(app.clone()),
- subscriber.clone(),
- Some(resume_at.into()),
- Query::default(),
- )
- .await
- .expect("subscribe never fails");
-
- let events = events
- .filter_map(fixtures::message::events)
- .take(resume_messages.len())
- .collect::<Vec<_>>()
- .immediately()
- .await;
-
- for message in &resume_messages {
- assert!(events
- .iter()
- .any(|event| fixtures::event::message_sent(event, message)));
- }
-
- let event = events.last().expect("this vec is non-empty");
-
- event.sequence()
- };
-
- // Resume after disconnect a second time
- {
- // At this point, we can send on either channel and demonstrate the
- // problem. The resume point should before both of these messages, but
- // after _all_ prior messages.
- let final_messages = [
- fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await,
- fixtures::message::send(&app, &channel_b, &sender, &fixtures::now()).await,
- ];
-
- // Third subscription
- let get::Response(events) = get::handler(
- State(app.clone()),
- subscriber.clone(),
- Some(resume_at.into()),
- Query::default(),
- )
- .await
- .expect("subscribe never fails");
-
- let events = events
- .filter_map(fixtures::message::events)
- .take(final_messages.len())
- .collect::<Vec<_>>()
- .immediately()
- .await;
-
- // This set of messages, in particular, _should not_ include any prior
- // messages from `initial_messages` or `resume_messages`.
- for message in &final_messages {
- assert!(events
- .iter()
- .any(|event| fixtures::event::message_sent(event, message)));
- }
- };
-}
-
-#[tokio::test]
-async fn terminates_on_token_expiry() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let channel = fixtures::channel::create(&app, &fixtures::now()).await;
- let sender = fixtures::login::create(&app, &fixtures::now()).await;
-
- // Subscribe via the endpoint
-
- let subscriber_creds = fixtures::login::create_with_password(&app, &fixtures::now()).await;
- let subscriber =
- fixtures::identity::logged_in(&app, &subscriber_creds, &fixtures::ancient()).await;
-
- let get::Response(events) =
- get::handler(State(app.clone()), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
-
- // Verify the resulting stream's behaviour
-
- app.tokens()
- .expire(&fixtures::now())
- .await
- .expect("expiring tokens succeeds");
-
- // These should not be delivered.
- let messages = [
- fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
- fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
- fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
- ];
-
- assert!(events
- .filter_map(fixtures::message::events)
- .filter(|event| future::ready(
- messages
- .iter()
- .any(|message| fixtures::event::message_sent(event, message))
- ))
- .next()
- .immediately()
- .await
- .is_none());
-}
-
-#[tokio::test]
-async fn terminates_on_logout() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let channel = fixtures::channel::create(&app, &fixtures::now()).await;
- let sender = fixtures::login::create(&app, &fixtures::now()).await;
-
- // Subscribe via the endpoint
-
- let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
-
- let get::Response(events) = get::handler(
- State(app.clone()),
- subscriber.clone(),
- None,
- Query::default(),
- )
- .await
- .expect("subscribe never fails");
-
- // Verify the resulting stream's behaviour
-
- app.tokens()
- .logout(&subscriber.token)
- .await
- .expect("expiring tokens succeeds");
-
- // These should not be delivered.
- let messages = [
- fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
- fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
- fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
- ];
-
- assert!(events
- .filter_map(fixtures::message::events)
- .filter(|event| future::ready(
- messages
- .iter()
- .any(|message| fixtures::event::message_sent(event, message))
- ))
- .next()
- .immediately()
- .await
- .is_none());
-}
diff --git a/src/event/routes/test/channel.rs b/src/event/routes/test/channel.rs
new file mode 100644
index 0000000..ac45bfc
--- /dev/null
+++ b/src/event/routes/test/channel.rs
@@ -0,0 +1,212 @@
+use axum::extract::State;
+use axum_extra::extract::Query;
+use futures::{future, stream::StreamExt as _};
+
+use crate::{
+ event::routes::get,
+ test::fixtures::{self, future::Immediately as _},
+};
+
+#[tokio::test]
+async fn creating() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+
+ // Subscribe
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let get::Response(events) =
+ get::handler(State(app.clone()), subscriber, None, Query::default())
+ .await
+ .expect("subscribe never fails");
+
+ // Create a channel
+
+ let name = fixtures::channel::propose();
+ let channel = app
+ .channels()
+ .create(&name, &fixtures::now())
+ .await
+ .expect("creating a channel succeeds");
+
+ // Verify channel created event
+
+ let _ = events
+ .filter_map(fixtures::channel::events)
+ .filter_map(fixtures::channel::created)
+ .filter(|event| future::ready(event.channel == channel))
+ .next()
+ .immediately()
+ .await
+ .expect("channel created event is delivered");
+}
+
+#[tokio::test]
+async fn previously_created() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+
+ // Create a channel
+
+ let name = fixtures::channel::propose();
+ let channel = app
+ .channels()
+ .create(&name, &fixtures::now())
+ .await
+ .expect("creating a channel succeeds");
+
+ // Subscribe
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let get::Response(events) =
+ get::handler(State(app.clone()), subscriber, None, Query::default())
+ .await
+ .expect("subscribe never fails");
+
+ // Verify channel created event
+
+ let _ = events
+ .filter_map(fixtures::channel::events)
+ .filter_map(fixtures::channel::created)
+ .filter(|event| future::ready(event.channel == channel))
+ .next()
+ .immediately()
+ .await
+ .expect("channel created event is delivered");
+}
+
+#[tokio::test]
+async fn expiring() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::ancient()).await;
+
+ // Subscribe
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let get::Response(events) =
+ get::handler(State(app.clone()), subscriber, None, Query::default())
+ .await
+ .expect("subscribe never fails");
+
+ // Expire channels
+
+ app.channels()
+ .expire(&fixtures::now())
+ .await
+ .expect("expiring channels always succeeds");
+
+ // Check for expiry event
+ let _ = events
+ .filter_map(fixtures::channel::events)
+ .filter_map(fixtures::channel::deleted)
+ .filter(|event| future::ready(event.id == channel.id))
+ .next()
+ .immediately()
+ .await
+ .expect("a deleted channel event will be delivered");
+}
+
+#[tokio::test]
+async fn previously_expired() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::ancient()).await;
+
+ // Expire channels
+
+ app.channels()
+ .expire(&fixtures::now())
+ .await
+ .expect("expiring channels always succeeds");
+
+ // Subscribe
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let get::Response(events) =
+ get::handler(State(app.clone()), subscriber, None, Query::default())
+ .await
+ .expect("subscribe never fails");
+
+ // Check for expiry event
+ let _ = events
+ .filter_map(fixtures::channel::events)
+ .filter_map(fixtures::channel::deleted)
+ .filter(|event| future::ready(event.id == channel.id))
+ .next()
+ .immediately()
+ .await
+ .expect("a deleted channel event will be delivered");
+}
+
+#[tokio::test]
+async fn deleting() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+
+ // Subscribe
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let get::Response(events) =
+ get::handler(State(app.clone()), subscriber, None, Query::default())
+ .await
+ .expect("subscribe never fails");
+
+ // Delete the channel
+
+ app.channels()
+ .delete(&channel.id, &fixtures::now())
+ .await
+ .expect("deleting a valid channel succeeds");
+
+ // Check for delete event
+ let _ = events
+ .filter_map(fixtures::channel::events)
+ .filter_map(fixtures::channel::deleted)
+ .filter(|event| future::ready(event.id == channel.id))
+ .next()
+ .immediately()
+ .await
+ .expect("a deleted channel event will be delivered");
+}
+
+#[tokio::test]
+async fn previously_deleted() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+ let sender = fixtures::login::create(&app, &fixtures::now()).await;
+ let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await;
+
+ // Delete the channel
+
+ app.channels()
+ .delete(&channel.id, &fixtures::now())
+ .await
+ .expect("deleting a valid channel succeeds");
+
+ // Subscribe
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let get::Response(events) =
+ get::handler(State(app.clone()), subscriber, None, Query::default())
+ .await
+ .expect("subscribe never fails");
+
+ // Check for expiry event
+ let _ = events
+ .filter_map(fixtures::message::events)
+ .filter_map(fixtures::message::deleted)
+ .filter(|event| future::ready(event.id == message.id))
+ .next()
+ .immediately()
+ .await
+ .expect("a deleted message will be delivered");
+}
diff --git a/src/event/routes/test/invite.rs b/src/event/routes/test/invite.rs
new file mode 100644
index 0000000..10e4521
--- /dev/null
+++ b/src/event/routes/test/invite.rs
@@ -0,0 +1,82 @@
+use axum::extract::State;
+use axum_extra::extract::Query;
+use futures::{future, stream::StreamExt as _};
+
+use crate::{
+ event::routes::get,
+ test::fixtures::{self, future::Immediately as _},
+};
+
+#[tokio::test]
+async fn accepting_invite() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let issuer = fixtures::login::create(&app, &fixtures::now()).await;
+ let invite = fixtures::invite::issue(&app, &issuer, &fixtures::now()).await;
+
+ // Subscribe
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let get::Response(events) =
+ get::handler(State(app.clone()), subscriber, None, Query::default())
+ .await
+ .expect("subscribe never fails");
+
+ // Accept the invite
+
+ let (name, password) = fixtures::login::propose();
+ let (joiner, _) = app
+ .invites()
+ .accept(&invite.id, &name, &password, &fixtures::now())
+ .await
+ .expect("accepting an invite succeeds");
+
+ // Expect a login created event
+
+ let _ = events
+ .filter_map(fixtures::login::events)
+ .filter_map(fixtures::login::created)
+ .filter(|event| future::ready(event.login == joiner))
+ .next()
+ .immediately()
+ .await
+ .expect("a login created event is sent");
+}
+
+#[tokio::test]
+async fn previously_accepted_invite() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let issuer = fixtures::login::create(&app, &fixtures::now()).await;
+ let invite = fixtures::invite::issue(&app, &issuer, &fixtures::now()).await;
+
+ // Accept the invite
+
+ let (name, password) = fixtures::login::propose();
+ let (joiner, _) = app
+ .invites()
+ .accept(&invite.id, &name, &password, &fixtures::now())
+ .await
+ .expect("accepting an invite succeeds");
+
+ // Subscribe
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let get::Response(events) =
+ get::handler(State(app.clone()), subscriber, None, Query::default())
+ .await
+ .expect("subscribe never fails");
+
+ // Expect a login created event
+
+ let _ = events
+ .filter_map(fixtures::login::events)
+ .filter_map(fixtures::login::created)
+ .filter(|event| future::ready(event.login == joiner))
+ .next()
+ .immediately()
+ .await
+ .expect("a login created event is sent");
+}
diff --git a/src/event/routes/test/message.rs b/src/event/routes/test/message.rs
new file mode 100644
index 0000000..9bbbc7d
--- /dev/null
+++ b/src/event/routes/test/message.rs
@@ -0,0 +1,316 @@
+use axum::extract::State;
+use axum_extra::extract::Query;
+use futures::{
+ future,
+ stream::{self, StreamExt as _},
+};
+
+use crate::{
+ event::routes::get,
+ test::fixtures::{self, future::Immediately as _},
+};
+
+#[tokio::test]
+async fn sending() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+
+ // Call the endpoint
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let get::Response(events) =
+ get::handler(State(app.clone()), subscriber, None, Query::default())
+ .await
+ .expect("subscribe never fails");
+
+ // Send a message
+
+ let sender = fixtures::login::create(&app, &fixtures::now()).await;
+ let message = app
+ .messages()
+ .send(
+ &channel.id,
+ &sender,
+ &fixtures::now(),
+ &fixtures::message::propose(),
+ )
+ .await
+ .expect("sending a message succeeds");
+
+ // Verify that an event is delivered
+
+ let _ = events
+ .filter_map(fixtures::message::events)
+ .filter_map(fixtures::message::sent)
+ .filter(|event| future::ready(event.message == message))
+ .next()
+ .immediately()
+ .await
+ .expect("delivered message sent event");
+}
+
+#[tokio::test]
+async fn previously_sent() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+
+ // Send a message
+
+ let sender = fixtures::login::create(&app, &fixtures::now()).await;
+ let message = app
+ .messages()
+ .send(
+ &channel.id,
+ &sender,
+ &fixtures::now(),
+ &fixtures::message::propose(),
+ )
+ .await
+ .expect("sending a message succeeds");
+
+ // Call the endpoint
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let get::Response(events) =
+ get::handler(State(app.clone()), subscriber, None, Query::default())
+ .await
+ .expect("subscribe never fails");
+
+ // Verify that an event is delivered
+
+ let _ = events
+ .filter_map(fixtures::message::events)
+ .filter_map(fixtures::message::sent)
+ .filter(|event| future::ready(event.message == message))
+ .next()
+ .immediately()
+ .await
+ .expect("delivered message sent event");
+}
+
+#[tokio::test]
+async fn sent_in_multiple_channels() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let sender = fixtures::login::create(&app, &fixtures::now()).await;
+
+ let channels = [
+ fixtures::channel::create(&app, &fixtures::now()).await,
+ fixtures::channel::create(&app, &fixtures::now()).await,
+ ];
+
+ let messages = stream::iter(channels)
+ .then(|channel| {
+ let app = app.clone();
+ let sender = sender.clone();
+ let channel = channel.clone();
+ async move { fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await }
+ })
+ .collect::<Vec<_>>()
+ .await;
+
+ // Call the endpoint
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let get::Response(events) = get::handler(State(app), subscriber, None, Query::default())
+ .await
+ .expect("subscribe never fails");
+
+ // Verify the structure of the response.
+
+ let events = events
+ .filter_map(fixtures::message::events)
+ .filter_map(fixtures::message::sent)
+ .take(messages.len())
+ .collect::<Vec<_>>()
+ .immediately()
+ .await;
+
+ for message in &messages {
+ assert!(events.iter().any(|event| &event.message == message));
+ }
+}
+
+#[tokio::test]
+async fn sent_sequentially() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+ let sender = fixtures::login::create(&app, &fixtures::now()).await;
+
+ let messages = vec![
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ ];
+
+ // Subscribe
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let get::Response(events) = get::handler(State(app), subscriber, None, Query::default())
+ .await
+ .expect("subscribe never fails");
+
+ // Verify the expected events in the expected order
+
+ let mut events = events
+ .filter_map(fixtures::message::events)
+ .filter_map(fixtures::message::sent)
+ .filter(|event| future::ready(messages.iter().any(|message| &event.message == message)));
+
+ for message in &messages {
+ let event = events
+ .next()
+ .immediately()
+ .await
+ .expect("undelivered messages remaining");
+
+ assert_eq!(message, &event.message);
+ }
+}
+
+#[tokio::test]
+async fn expiring() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::ancient()).await;
+ let sender = fixtures::login::create(&app, &fixtures::ancient()).await;
+ let message = fixtures::message::send(&app, &channel, &sender, &fixtures::ancient()).await;
+
+ // Subscribe
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let get::Response(events) =
+ get::handler(State(app.clone()), subscriber, None, Query::default())
+ .await
+ .expect("subscribe never fails");
+
+ // Expire messages
+
+ app.messages()
+ .expire(&fixtures::now())
+ .await
+ .expect("expiring messages always succeeds");
+
+ // Check for expiry event
+ let _ = events
+ .filter_map(fixtures::message::events)
+ .filter_map(fixtures::message::deleted)
+ .filter(|event| future::ready(event.id == message.id))
+ .next()
+ .immediately()
+ .await
+ .expect("a deleted message event will be delivered");
+}
+
+#[tokio::test]
+async fn previously_expired() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::ancient()).await;
+ let sender = fixtures::login::create(&app, &fixtures::ancient()).await;
+ let message = fixtures::message::send(&app, &channel, &sender, &fixtures::ancient()).await;
+
+ // Expire messages
+
+ app.messages()
+ .expire(&fixtures::now())
+ .await
+ .expect("expiring messages always succeeds");
+
+ // Subscribe
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let get::Response(events) =
+ get::handler(State(app.clone()), subscriber, None, Query::default())
+ .await
+ .expect("subscribe never fails");
+
+ // Check for expiry event
+ let _ = events
+ .filter_map(fixtures::message::events)
+ .filter_map(fixtures::message::deleted)
+ .filter(|event| future::ready(event.id == message.id))
+ .next()
+ .immediately()
+ .await
+ .expect("a deleted message event will be delivered");
+}
+
+#[tokio::test]
+async fn deleting() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+ let sender = fixtures::login::create(&app, &fixtures::now()).await;
+ let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await;
+
+ // Subscribe
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let get::Response(events) =
+ get::handler(State(app.clone()), subscriber, None, Query::default())
+ .await
+ .expect("subscribe never fails");
+
+ // Delete the message
+
+ app.messages()
+ .delete(&message.id, &fixtures::now())
+ .await
+ .expect("deleting a valid message succeeds");
+
+ // Check for delete event
+ let _ = events
+ .filter_map(fixtures::message::events)
+ .filter_map(fixtures::message::deleted)
+ .filter(|event| future::ready(event.id == message.id))
+ .next()
+ .immediately()
+ .await
+ .expect("a deleted message event will be delivered");
+}
+
+#[tokio::test]
+async fn previously_deleted() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+ let sender = fixtures::login::create(&app, &fixtures::now()).await;
+ let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await;
+
+ // Delete the message
+
+ app.messages()
+ .delete(&message.id, &fixtures::now())
+ .await
+ .expect("deleting a valid message succeeds");
+
+ // Subscribe
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let get::Response(events) =
+ get::handler(State(app.clone()), subscriber, None, Query::default())
+ .await
+ .expect("subscribe never fails");
+
+ // Check for delete event
+ let _ = events
+ .filter_map(fixtures::message::events)
+ .filter_map(fixtures::message::deleted)
+ .filter(|event| future::ready(event.id == message.id))
+ .next()
+ .immediately()
+ .await
+ .expect("a deleted message event will be delivered");
+}
diff --git a/src/event/routes/test/mod.rs b/src/event/routes/test/mod.rs
new file mode 100644
index 0000000..e7e35f1
--- /dev/null
+++ b/src/event/routes/test/mod.rs
@@ -0,0 +1,6 @@
+mod channel;
+mod invite;
+mod message;
+mod resume;
+mod setup;
+mod token;
diff --git a/src/event/routes/test/resume.rs b/src/event/routes/test/resume.rs
new file mode 100644
index 0000000..c393d38
--- /dev/null
+++ b/src/event/routes/test/resume.rs
@@ -0,0 +1,220 @@
+use std::future;
+
+use axum::extract::State;
+use axum_extra::extract::Query;
+use futures::stream::{self, StreamExt as _};
+
+use crate::{
+ event::{routes::get, Sequenced as _},
+ test::fixtures::{self, future::Immediately as _},
+};
+
+#[tokio::test]
+async fn resume() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+ let sender = fixtures::login::create(&app, &fixtures::now()).await;
+
+ let initial_message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await;
+
+ let later_messages = vec![
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ ];
+
+ // Call the endpoint
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+
+ let resume_at = {
+ // First subscription
+ let get::Response(events) = get::handler(
+ State(app.clone()),
+ subscriber.clone(),
+ None,
+ Query::default(),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ let event = events
+ .filter_map(fixtures::message::events)
+ .filter_map(fixtures::message::sent)
+ .filter(|event| future::ready(event.message == initial_message))
+ .next()
+ .immediately()
+ .await
+ .expect("delivered events");
+
+ event.sequence()
+ };
+
+ // Resume after disconnect
+ let get::Response(resumed) = get::handler(
+ State(app),
+ subscriber,
+ Some(resume_at.into()),
+ Query::default(),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ // Verify final events
+
+ let mut events = resumed
+ .filter_map(fixtures::message::events)
+ .filter_map(fixtures::message::sent)
+ .zip(stream::iter(later_messages));
+
+ while let Some((event, message)) = events.next().immediately().await {
+ assert_eq!(message, event.message);
+ }
+}
+
+// This test verifies a real bug I hit developing the vector-of-sequences
+// approach to resuming events. A small omission caused the event IDs in a
+// resumed stream to _omit_ channels that were in the original stream until
+// those channels also appeared in the resumed stream.
+//
+// Clients would see something like
+// * In the original stream, Cfoo=5,Cbar=8
+// * In the resumed stream, Cfoo=6 (no Cbar sequence number)
+//
+// Disconnecting and reconnecting a second time, using event IDs from that
+// initial period of the first resume attempt, would then cause the second
+// resume attempt to restart all other channels from the beginning, and not
+// from where the first disconnection happened.
+//
+// As we have switched to a single global event sequence number, this scenario
+// can no longer arise, but this test is preserved because the actual behaviour
+// _is_ a valid way for clients to behave, and should work. We might as well
+// keep testing it.
+#[tokio::test]
+async fn serial_resume() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let sender = fixtures::login::create(&app, &fixtures::now()).await;
+ let channel_a = fixtures::channel::create(&app, &fixtures::now()).await;
+ let channel_b = fixtures::channel::create(&app, &fixtures::now()).await;
+
+ // Call the endpoint
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+
+ let resume_at = {
+ let initial_messages = [
+ fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel_b, &sender, &fixtures::now()).await,
+ ];
+
+ // First subscription
+
+ let get::Response(events) = get::handler(
+ State(app.clone()),
+ subscriber.clone(),
+ None,
+ Query::default(),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ // Check for expected events
+
+ let events = events
+ .filter_map(fixtures::message::events)
+ .filter_map(fixtures::message::sent)
+ .zip(stream::iter(initial_messages))
+ .collect::<Vec<_>>()
+ .immediately()
+ .await;
+
+ assert!(events
+ .iter()
+ .all(|(event, message)| message == &event.message));
+
+ let (event, _) = events.last().expect("this vec is non-empty");
+
+ // Take the last one's resume point
+
+ event.sequence()
+ };
+
+ // Resume after disconnect
+ let resume_at = {
+ let resume_messages = [
+ // Note that channel_b does not appear here. The buggy behaviour
+ // would be masked if channel_b happened to send a new message
+ // into the resumed event stream.
+ fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await,
+ ];
+
+ // Second subscription
+ let get::Response(events) = get::handler(
+ State(app.clone()),
+ subscriber.clone(),
+ Some(resume_at.into()),
+ Query::default(),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ // Check for expected events
+
+ let events = events
+ .filter_map(fixtures::message::events)
+ .filter_map(fixtures::message::sent)
+ .zip(stream::iter(resume_messages))
+ .collect::<Vec<_>>()
+ .immediately()
+ .await;
+
+ assert!(events
+ .iter()
+ .all(|(event, message)| message == &event.message));
+
+ let (event, _) = events.last().expect("this vec is non-empty");
+
+ // Take the last one's resume point
+
+ event.sequence()
+ };
+
+ // Resume after disconnect a second time
+ {
+ // At this point, we can send on either channel and demonstrate the
+ // problem. The resume point should before both of these messages, but
+ // after _all_ prior messages.
+ let final_messages = [
+ fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel_b, &sender, &fixtures::now()).await,
+ ];
+
+ // Third subscription
+ let get::Response(events) = get::handler(
+ State(app.clone()),
+ subscriber.clone(),
+ Some(resume_at.into()),
+ Query::default(),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ // Check for expected events
+
+ let events = events
+ .filter_map(fixtures::message::events)
+ .filter_map(fixtures::message::sent)
+ .zip(stream::iter(final_messages))
+ .collect::<Vec<_>>()
+ .immediately()
+ .await;
+
+ assert!(events
+ .iter()
+ .all(|(event, message)| message == &event.message));
+ };
+}
diff --git a/src/event/routes/test/setup.rs b/src/event/routes/test/setup.rs
new file mode 100644
index 0000000..234c2d9
--- /dev/null
+++ b/src/event/routes/test/setup.rs
@@ -0,0 +1,46 @@
+use axum::extract::State;
+use axum_extra::extract::Query;
+use futures::{future, stream::StreamExt as _};
+
+use crate::{
+ event::routes::get,
+ test::fixtures::{self, future::Immediately as _},
+};
+
+// There's no test for this in subscribe-then-setup order because creating an
+// identity to subscribe with also completes initial setup, preventing the
+// test from running. That is also a can't-happen scenario in reality.
+#[tokio::test]
+async fn previously_completed() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+
+ // Complete initial setup
+
+ let (name, password) = fixtures::login::propose();
+ let (owner, _) = app
+ .setup()
+ .initial(&name, &password, &fixtures::now())
+ .await
+ .expect("initial setup in an empty app succeeds");
+
+ // Subscribe to events
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let get::Response(events) =
+ get::handler(State(app.clone()), subscriber, None, Query::default())
+ .await
+ .expect("subscribe never fails");
+
+ // Expect a login created event
+
+ let _ = events
+ .filter_map(fixtures::login::events)
+ .filter_map(fixtures::login::created)
+ .filter(|event| future::ready(event.login == owner))
+ .next()
+ .immediately()
+ .await
+ .expect("a login created event is sent");
+}
diff --git a/src/event/routes/test/token.rs b/src/event/routes/test/token.rs
new file mode 100644
index 0000000..a545988
--- /dev/null
+++ b/src/event/routes/test/token.rs
@@ -0,0 +1,97 @@
+use axum::extract::State;
+use axum_extra::extract::Query;
+use futures::{future, stream::StreamExt as _};
+
+use crate::{
+ event::routes::get,
+ test::fixtures::{self, future::Immediately as _},
+};
+
+#[tokio::test]
+async fn terminates_on_token_expiry() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+ let sender = fixtures::login::create(&app, &fixtures::now()).await;
+
+ // Subscribe via the endpoint
+
+ let subscriber_creds = fixtures::login::create_with_password(&app, &fixtures::now()).await;
+ let subscriber =
+ fixtures::identity::logged_in(&app, &subscriber_creds, &fixtures::ancient()).await;
+
+ let get::Response(events) =
+ get::handler(State(app.clone()), subscriber, None, Query::default())
+ .await
+ .expect("subscribe never fails");
+
+ // Verify the resulting stream's behaviour
+
+ app.tokens()
+ .expire(&fixtures::now())
+ .await
+ .expect("expiring tokens succeeds");
+
+ // These should not be delivered.
+ let messages = [
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ ];
+
+ assert!(events
+ .filter_map(fixtures::message::events)
+ .filter_map(fixtures::message::sent)
+ .filter(|event| future::ready(messages.iter().any(|message| &event.message == message)))
+ .next()
+ .immediately()
+ .await
+ .is_none());
+}
+
+#[tokio::test]
+async fn terminates_on_logout() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+ let sender = fixtures::login::create(&app, &fixtures::now()).await;
+
+ // Subscribe via the endpoint
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+
+ let get::Response(events) = get::handler(
+ State(app.clone()),
+ subscriber.clone(),
+ None,
+ Query::default(),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ // Verify the resulting stream's behaviour
+
+ app.tokens()
+ .logout(&subscriber.token)
+ .await
+ .expect("expiring tokens succeeds");
+
+ // These should not be delivered.
+
+ let messages = [
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ ];
+
+ assert!(events
+ .filter_map(fixtures::message::events)
+ .filter_map(fixtures::message::sent)
+ .filter(|event| future::ready(messages.iter().any(|message| &event.message == message)))
+ .next()
+ .immediately()
+ .await
+ .is_none());
+}
diff --git a/src/invite/app.rs b/src/invite/app.rs
index 65e7721..176075f 100644
--- a/src/invite/app.rs
+++ b/src/invite/app.rs
@@ -5,7 +5,7 @@ use super::{repo::Provider as _, Id, Invite, Summary};
use crate::{
clock::DateTime,
db::{Duplicate as _, NotFound as _},
- event::repo::Provider as _,
+ event::{repo::Provider as _, Broadcaster, Event},
login::{repo::Provider as _, Login, Password},
name::Name,
token::{repo::Provider as _, Secret},
@@ -13,11 +13,12 @@ use crate::{
pub struct Invites<'a> {
db: &'a SqlitePool,
+ events: &'a Broadcaster,
}
impl<'a> Invites<'a> {
- pub const fn new(db: &'a SqlitePool) -> Self {
- Self { db }
+ pub const fn new(db: &'a SqlitePool, events: &'a Broadcaster) -> Self {
+ Self { db, events }
}
pub async fn issue(&self, issuer: &Login, issued_at: &DateTime) -> Result<Invite, sqlx::Error> {
@@ -69,6 +70,9 @@ impl<'a> Invites<'a> {
let secret = tx.tokens().issue(&login, accepted_at).await?;
tx.commit().await?;
+ self.events
+ .broadcast(login.events().map(Event::from).collect::<Vec<_>>());
+
Ok((login.as_created(), secret))
}
diff --git a/src/test/fixtures/channel.rs b/src/test/fixtures/channel.rs
index 8cb38ae..1fd8d23 100644
--- a/src/test/fixtures/channel.rs
+++ b/src/test/fixtures/channel.rs
@@ -45,6 +45,13 @@ pub fn created(event: channel::Event) -> future::Ready<Option<channel::event::Cr
})
}
+pub fn deleted(event: channel::Event) -> future::Ready<Option<channel::event::Deleted>> {
+ future::ready(match event {
+ channel::Event::Deleted(event) => Some(event),
+ channel::Event::Created(_) => None,
+ })
+}
+
pub fn fictitious() -> channel::Id {
channel::Id::generate()
}
diff --git a/src/test/fixtures/event.rs b/src/test/fixtures/event.rs
deleted file mode 100644
index fa4fbc0..0000000
--- a/src/test/fixtures/event.rs
+++ /dev/null
@@ -1,8 +0,0 @@
-use crate::message::{Event, Message};
-
-pub fn message_sent(event: &Event, message: &Message) -> bool {
- matches!(
- &event,
- Event::Sent(event) if message == &event.into()
- )
-}
diff --git a/src/test/fixtures/login.rs b/src/test/fixtures/login.rs
index e308289..cbcbdd4 100644
--- a/src/test/fixtures/login.rs
+++ b/src/test/fixtures/login.rs
@@ -1,9 +1,12 @@
+use std::future::{self, Ready};
+
use faker_rand::en_us::internet;
use uuid::Uuid;
use crate::{
app::App,
clock::RequestedAt,
+ event::Event,
login::{self, Login, Password},
name::Name,
};
@@ -45,3 +48,16 @@ fn propose_name() -> Name {
pub fn propose_password() -> Password {
Uuid::new_v4().to_string().into()
}
+
+pub fn events(event: Event) -> Ready<Option<login::Event>> {
+ future::ready(match event {
+ Event::Login(event) => Some(event),
+ _ => None,
+ })
+}
+
+pub fn created(event: login::Event) -> Ready<Option<login::event::Created>> {
+ future::ready(match event {
+ login::Event::Created(event) => Some(event),
+ })
+}
diff --git a/src/test/fixtures/message.rs b/src/test/fixtures/message.rs
index 3aebdd9..8cb50c1 100644
--- a/src/test/fixtures/message.rs
+++ b/src/test/fixtures/message.rs
@@ -8,7 +8,7 @@ use crate::{
clock::RequestedAt,
event::Event,
login::Login,
- message::{self, Body, Message},
+ message::{self, event, Body, Message},
};
pub async fn send(app: &App, channel: &Channel, sender: &Login, sent_at: &RequestedAt) -> Message {
@@ -31,6 +31,20 @@ pub fn events(event: Event) -> future::Ready<Option<message::Event>> {
})
}
+pub fn sent(event: message::Event) -> future::Ready<Option<event::Sent>> {
+ future::ready(match event {
+ message::Event::Sent(event) => Some(event),
+ message::Event::Deleted(_) => None,
+ })
+}
+
+pub fn deleted(event: message::Event) -> future::Ready<Option<event::Deleted>> {
+ future::ready(match event {
+ message::Event::Deleted(event) => Some(event),
+ message::Event::Sent(_) => None,
+ })
+}
+
pub fn fictitious() -> message::Id {
message::Id::generate()
}
diff --git a/src/test/fixtures/mod.rs b/src/test/fixtures/mod.rs
index 2b7b6af..cf30e02 100644
--- a/src/test/fixtures/mod.rs
+++ b/src/test/fixtures/mod.rs
@@ -4,7 +4,6 @@ use crate::{app::App, clock::RequestedAt, db};
pub mod channel;
pub mod cookie;
-pub mod event;
pub mod future;
pub mod identity;
pub mod invite;