summaryrefslogtreecommitdiff
path: root/src/event/routes/test.rs
diff options
context:
space:
mode:
authorKit La Touche <kit@transneptune.net>2024-10-03 23:30:42 -0400
committerKit La Touche <kit@transneptune.net>2024-10-03 23:30:42 -0400
commitd50b1b56c011c03c7d8a95242af404b727e91a80 (patch)
treeefe3408f6a8ef669981826d1a29d16a24b460d89 /src/event/routes/test.rs
parent30c13478d61065a512f5bc8824fecbf2ee6afc81 (diff)
parent7f12fd41c2941a55a6437f24e4f780104a718790 (diff)
Merge branch 'main' into feature-frontend
Diffstat (limited to 'src/event/routes/test.rs')
-rw-r--r--src/event/routes/test.rs463
1 files changed, 463 insertions, 0 deletions
diff --git a/src/event/routes/test.rs b/src/event/routes/test.rs
new file mode 100644
index 0000000..ba9953e
--- /dev/null
+++ b/src/event/routes/test.rs
@@ -0,0 +1,463 @@
+use axum::extract::State;
+use axum_extra::extract::Query;
+use futures::{
+ future,
+ stream::{self, StreamExt as _},
+};
+
+use crate::{
+ event::{routes, Sequenced as _},
+ test::fixtures::{self, future::Immediately as _},
+};
+
+#[tokio::test]
+async fn includes_historical_message() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let sender = fixtures::login::create(&app).await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+ let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await;
+
+ // Call the endpoint
+
+ let subscriber_creds = fixtures::login::create_with_password(&app).await;
+ let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await;
+ let routes::Events(events) = routes::events(State(app), subscriber, None, Query::default())
+ .await
+ .expect("subscribe never fails");
+
+ // Verify the structure of the response.
+
+ let event = events
+ .filter(fixtures::filter::messages())
+ .next()
+ .immediately()
+ .await
+ .expect("delivered stored message");
+
+ assert!(fixtures::event::message_sent(&event, &message));
+}
+
+#[tokio::test]
+async fn includes_live_message() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+
+ // Call the endpoint
+
+ let subscriber_creds = fixtures::login::create_with_password(&app).await;
+ let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await;
+ let routes::Events(events) =
+ routes::events(State(app.clone()), subscriber, None, Query::default())
+ .await
+ .expect("subscribe never fails");
+
+ // Verify the semantics
+
+ let sender = fixtures::login::create(&app).await;
+ let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await;
+
+ let event = events
+ .filter(fixtures::filter::messages())
+ .next()
+ .immediately()
+ .await
+ .expect("delivered live message");
+
+ assert!(fixtures::event::message_sent(&event, &message));
+}
+
+#[tokio::test]
+async fn includes_multiple_channels() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let sender = fixtures::login::create(&app).await;
+
+ let channels = [
+ fixtures::channel::create(&app, &fixtures::now()).await,
+ fixtures::channel::create(&app, &fixtures::now()).await,
+ ];
+
+ let messages = stream::iter(channels)
+ .then(|channel| {
+ let app = app.clone();
+ let sender = sender.clone();
+ let channel = channel.clone();
+ async move { fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await }
+ })
+ .collect::<Vec<_>>()
+ .await;
+
+ // Call the endpoint
+
+ let subscriber_creds = fixtures::login::create_with_password(&app).await;
+ let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await;
+ let routes::Events(events) = routes::events(State(app), subscriber, None, Query::default())
+ .await
+ .expect("subscribe never fails");
+
+ // Verify the structure of the response.
+
+ let events = events
+ .filter(fixtures::filter::messages())
+ .take(messages.len())
+ .collect::<Vec<_>>()
+ .immediately()
+ .await;
+
+ for message in &messages {
+ assert!(events
+ .iter()
+ .any(|event| fixtures::event::message_sent(event, message)));
+ }
+}
+
+#[tokio::test]
+async fn sequential_messages() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+ let sender = fixtures::login::create(&app).await;
+
+ let messages = vec![
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ ];
+
+ // Call the endpoint
+
+ let subscriber_creds = fixtures::login::create_with_password(&app).await;
+ let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await;
+ let routes::Events(events) = routes::events(State(app), subscriber, None, Query::default())
+ .await
+ .expect("subscribe never fails");
+
+ // Verify the structure of the response.
+
+ let mut events = events.filter(|event| {
+ future::ready(
+ messages
+ .iter()
+ .any(|message| fixtures::event::message_sent(event, message)),
+ )
+ });
+
+ // Verify delivery in order
+ for message in &messages {
+ let event = events
+ .next()
+ .immediately()
+ .await
+ .expect("undelivered messages remaining");
+
+ assert!(fixtures::event::message_sent(&event, message));
+ }
+}
+
+#[tokio::test]
+async fn resumes_from() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+ let sender = fixtures::login::create(&app).await;
+
+ let initial_message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await;
+
+ let later_messages = vec![
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ ];
+
+ // Call the endpoint
+
+ let subscriber_creds = fixtures::login::create_with_password(&app).await;
+ let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await;
+
+ let resume_at = {
+ // First subscription
+ let routes::Events(events) = routes::events(
+ State(app.clone()),
+ subscriber.clone(),
+ None,
+ Query::default(),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ let event = events
+ .filter(fixtures::filter::messages())
+ .next()
+ .immediately()
+ .await
+ .expect("delivered events");
+
+ assert!(fixtures::event::message_sent(&event, &initial_message));
+
+ event.sequence()
+ };
+
+ // Resume after disconnect
+ let routes::Events(resumed) = routes::events(
+ State(app),
+ subscriber,
+ Some(resume_at.into()),
+ Query::default(),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ // Verify the structure of the response.
+
+ let events = resumed
+ .take(later_messages.len())
+ .collect::<Vec<_>>()
+ .immediately()
+ .await;
+
+ for message in &later_messages {
+ assert!(events
+ .iter()
+ .any(|event| fixtures::event::message_sent(event, message)));
+ }
+}
+
+// This test verifies a real bug I hit developing the vector-of-sequences
+// approach to resuming events. A small omission caused the event IDs in a
+// resumed stream to _omit_ channels that were in the original stream until
+// those channels also appeared in the resumed stream.
+//
+// Clients would see something like
+// * In the original stream, Cfoo=5,Cbar=8
+// * In the resumed stream, Cfoo=6 (no Cbar sequence number)
+//
+// Disconnecting and reconnecting a second time, using event IDs from that
+// initial period of the first resume attempt, would then cause the second
+// resume attempt to restart all other channels from the beginning, and not
+// from where the first disconnection happened.
+//
+// This is a real and valid behaviour for clients!
+#[tokio::test]
+async fn serial_resume() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let sender = fixtures::login::create(&app).await;
+ let channel_a = fixtures::channel::create(&app, &fixtures::now()).await;
+ let channel_b = fixtures::channel::create(&app, &fixtures::now()).await;
+
+ // Call the endpoint
+
+ let subscriber_creds = fixtures::login::create_with_password(&app).await;
+ let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await;
+
+ let resume_at = {
+ let initial_messages = [
+ fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel_b, &sender, &fixtures::now()).await,
+ ];
+
+ // First subscription
+ let routes::Events(events) = routes::events(
+ State(app.clone()),
+ subscriber.clone(),
+ None,
+ Query::default(),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ let events = events
+ .filter(fixtures::filter::messages())
+ .take(initial_messages.len())
+ .collect::<Vec<_>>()
+ .immediately()
+ .await;
+
+ for message in &initial_messages {
+ assert!(events
+ .iter()
+ .any(|event| fixtures::event::message_sent(event, message)));
+ }
+
+ let event = events.last().expect("this vec is non-empty");
+
+ event.sequence()
+ };
+
+ // Resume after disconnect
+ let resume_at = {
+ let resume_messages = [
+ // Note that channel_b does not appear here. The buggy behaviour
+ // would be masked if channel_b happened to send a new message
+ // into the resumed event stream.
+ fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await,
+ ];
+
+ // Second subscription
+ let routes::Events(events) = routes::events(
+ State(app.clone()),
+ subscriber.clone(),
+ Some(resume_at.into()),
+ Query::default(),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ let events = events
+ .filter(fixtures::filter::messages())
+ .take(resume_messages.len())
+ .collect::<Vec<_>>()
+ .immediately()
+ .await;
+
+ for message in &resume_messages {
+ assert!(events
+ .iter()
+ .any(|event| fixtures::event::message_sent(event, message)));
+ }
+
+ let event = events.last().expect("this vec is non-empty");
+
+ event.sequence()
+ };
+
+ // Resume after disconnect a second time
+ {
+ // At this point, we can send on either channel and demonstrate the
+ // problem. The resume point should before both of these messages, but
+ // after _all_ prior messages.
+ let final_messages = [
+ fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel_b, &sender, &fixtures::now()).await,
+ ];
+
+ // Third subscription
+ let routes::Events(events) = routes::events(
+ State(app.clone()),
+ subscriber.clone(),
+ Some(resume_at.into()),
+ Query::default(),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ let events = events
+ .filter(fixtures::filter::messages())
+ .take(final_messages.len())
+ .collect::<Vec<_>>()
+ .immediately()
+ .await;
+
+ // This set of messages, in particular, _should not_ include any prior
+ // messages from `initial_messages` or `resume_messages`.
+ for message in &final_messages {
+ assert!(events
+ .iter()
+ .any(|event| fixtures::event::message_sent(event, message)));
+ }
+ };
+}
+
+#[tokio::test]
+async fn terminates_on_token_expiry() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+ let sender = fixtures::login::create(&app).await;
+
+ // Subscribe via the endpoint
+
+ let subscriber_creds = fixtures::login::create_with_password(&app).await;
+ let subscriber =
+ fixtures::identity::identity(&app, &subscriber_creds, &fixtures::ancient()).await;
+
+ let routes::Events(events) =
+ routes::events(State(app.clone()), subscriber, None, Query::default())
+ .await
+ .expect("subscribe never fails");
+
+ // Verify the resulting stream's behaviour
+
+ app.tokens()
+ .expire(&fixtures::now())
+ .await
+ .expect("expiring tokens succeeds");
+
+ // These should not be delivered.
+ let messages = [
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ ];
+
+ assert!(events
+ .filter(|event| future::ready(
+ messages
+ .iter()
+ .any(|message| fixtures::event::message_sent(event, message))
+ ))
+ .next()
+ .immediately()
+ .await
+ .is_none());
+}
+
+#[tokio::test]
+async fn terminates_on_logout() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+ let sender = fixtures::login::create(&app).await;
+
+ // Subscribe via the endpoint
+
+ let subscriber_creds = fixtures::login::create_with_password(&app).await;
+ let subscriber_token =
+ fixtures::identity::logged_in(&app, &subscriber_creds, &fixtures::now()).await;
+ let subscriber =
+ fixtures::identity::from_token(&app, &subscriber_token, &fixtures::now()).await;
+
+ let routes::Events(events) = routes::events(
+ State(app.clone()),
+ subscriber.clone(),
+ None,
+ Query::default(),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ // Verify the resulting stream's behaviour
+
+ app.tokens()
+ .logout(&subscriber.token)
+ .await
+ .expect("expiring tokens succeeds");
+
+ // These should not be delivered.
+ let messages = [
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ ];
+
+ assert!(events
+ .filter(|event| future::ready(
+ messages
+ .iter()
+ .any(|message| fixtures::event::message_sent(event, message))
+ ))
+ .next()
+ .immediately()
+ .await
+ .is_none());
+}