summaryrefslogtreecommitdiff
path: root/src/events/routes/test.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/events/routes/test.rs')
-rw-r--r--src/events/routes/test.rs439
1 files changed, 0 insertions, 439 deletions
diff --git a/src/events/routes/test.rs b/src/events/routes/test.rs
deleted file mode 100644
index 11f01b8..0000000
--- a/src/events/routes/test.rs
+++ /dev/null
@@ -1,439 +0,0 @@
-use axum::extract::State;
-use axum_extra::extract::Query;
-use futures::{
- future,
- stream::{self, StreamExt as _},
-};
-
-use crate::{
- events::routes,
- test::fixtures::{self, future::Immediately as _},
-};
-
-#[tokio::test]
-async fn includes_historical_message() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let sender = fixtures::login::create(&app).await;
- let channel = fixtures::channel::create(&app, &fixtures::now()).await;
- let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await;
-
- // Call the endpoint
-
- let subscriber_creds = fixtures::login::create_with_password(&app).await;
- let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await;
- let routes::Events(events) = routes::events(State(app), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
-
- // Verify the structure of the response.
-
- let event = events
- .filter(fixtures::filter::messages())
- .next()
- .immediately()
- .await
- .expect("delivered stored message");
-
- assert_eq!(message, event);
-}
-
-#[tokio::test]
-async fn includes_live_message() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let channel = fixtures::channel::create(&app, &fixtures::now()).await;
-
- // Call the endpoint
-
- let subscriber_creds = fixtures::login::create_with_password(&app).await;
- let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await;
- let routes::Events(events) =
- routes::events(State(app.clone()), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
-
- // Verify the semantics
-
- let sender = fixtures::login::create(&app).await;
- let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await;
-
- let event = events
- .filter(fixtures::filter::messages())
- .next()
- .immediately()
- .await
- .expect("delivered live message");
-
- assert_eq!(message, event);
-}
-
-#[tokio::test]
-async fn includes_multiple_channels() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let sender = fixtures::login::create(&app).await;
-
- let channels = [
- fixtures::channel::create(&app, &fixtures::now()).await,
- fixtures::channel::create(&app, &fixtures::now()).await,
- ];
-
- let messages = stream::iter(channels)
- .then(|channel| {
- let app = app.clone();
- let sender = sender.clone();
- let channel = channel.clone();
- async move { fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await }
- })
- .collect::<Vec<_>>()
- .await;
-
- // Call the endpoint
-
- let subscriber_creds = fixtures::login::create_with_password(&app).await;
- let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await;
- let routes::Events(events) = routes::events(State(app), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
-
- // Verify the structure of the response.
-
- let events = events
- .filter(fixtures::filter::messages())
- .take(messages.len())
- .collect::<Vec<_>>()
- .immediately()
- .await;
-
- for message in &messages {
- assert!(events.iter().any(|event| { event == message }));
- }
-}
-
-#[tokio::test]
-async fn sequential_messages() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let channel = fixtures::channel::create(&app, &fixtures::now()).await;
- let sender = fixtures::login::create(&app).await;
-
- let messages = vec![
- fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await,
- fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await,
- fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await,
- ];
-
- // Call the endpoint
-
- let subscriber_creds = fixtures::login::create_with_password(&app).await;
- let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await;
- let routes::Events(events) = routes::events(State(app), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
-
- // Verify the structure of the response.
-
- let mut events = events.filter(|event| future::ready(messages.contains(event)));
-
- // Verify delivery in order
- for message in &messages {
- let event = events
- .next()
- .immediately()
- .await
- .expect("undelivered messages remaining");
-
- assert_eq!(message, &event);
- }
-}
-
-#[tokio::test]
-async fn resumes_from() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let channel = fixtures::channel::create(&app, &fixtures::now()).await;
- let sender = fixtures::login::create(&app).await;
-
- let initial_message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await;
-
- let later_messages = vec![
- fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await,
- fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await,
- ];
-
- // Call the endpoint
-
- let subscriber_creds = fixtures::login::create_with_password(&app).await;
- let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await;
-
- let resume_at = {
- // First subscription
- let routes::Events(events) = routes::events(
- State(app.clone()),
- subscriber.clone(),
- None,
- Query::default(),
- )
- .await
- .expect("subscribe never fails");
-
- let event = events
- .filter(fixtures::filter::messages())
- .next()
- .immediately()
- .await
- .expect("delivered events");
-
- assert_eq!(initial_message, event);
-
- event.sequence
- };
-
- // Resume after disconnect
- let routes::Events(resumed) = routes::events(
- State(app),
- subscriber,
- Some(resume_at.into()),
- Query::default(),
- )
- .await
- .expect("subscribe never fails");
-
- // Verify the structure of the response.
-
- let events = resumed
- .take(later_messages.len())
- .collect::<Vec<_>>()
- .immediately()
- .await;
-
- for message in &later_messages {
- assert!(events.iter().any(|event| event == message));
- }
-}
-
-// This test verifies a real bug I hit developing the vector-of-sequences
-// approach to resuming events. A small omission caused the event IDs in a
-// resumed stream to _omit_ channels that were in the original stream until
-// those channels also appeared in the resumed stream.
-//
-// Clients would see something like
-// * In the original stream, Cfoo=5,Cbar=8
-// * In the resumed stream, Cfoo=6 (no Cbar sequence number)
-//
-// Disconnecting and reconnecting a second time, using event IDs from that
-// initial period of the first resume attempt, would then cause the second
-// resume attempt to restart all other channels from the beginning, and not
-// from where the first disconnection happened.
-//
-// This is a real and valid behaviour for clients!
-#[tokio::test]
-async fn serial_resume() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let sender = fixtures::login::create(&app).await;
- let channel_a = fixtures::channel::create(&app, &fixtures::now()).await;
- let channel_b = fixtures::channel::create(&app, &fixtures::now()).await;
-
- // Call the endpoint
-
- let subscriber_creds = fixtures::login::create_with_password(&app).await;
- let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await;
-
- let resume_at = {
- let initial_messages = [
- fixtures::message::send(&app, &sender, &channel_a, &fixtures::now()).await,
- fixtures::message::send(&app, &sender, &channel_b, &fixtures::now()).await,
- ];
-
- // First subscription
- let routes::Events(events) = routes::events(
- State(app.clone()),
- subscriber.clone(),
- None,
- Query::default(),
- )
- .await
- .expect("subscribe never fails");
-
- let events = events
- .filter(fixtures::filter::messages())
- .take(initial_messages.len())
- .collect::<Vec<_>>()
- .immediately()
- .await;
-
- for message in &initial_messages {
- assert!(events.iter().any(|event| event == message));
- }
-
- let event = events.last().expect("this vec is non-empty");
-
- event.sequence
- };
-
- // Resume after disconnect
- let resume_at = {
- let resume_messages = [
- // Note that channel_b does not appear here. The buggy behaviour
- // would be masked if channel_b happened to send a new message
- // into the resumed event stream.
- fixtures::message::send(&app, &sender, &channel_a, &fixtures::now()).await,
- fixtures::message::send(&app, &sender, &channel_a, &fixtures::now()).await,
- ];
-
- // Second subscription
- let routes::Events(events) = routes::events(
- State(app.clone()),
- subscriber.clone(),
- Some(resume_at.into()),
- Query::default(),
- )
- .await
- .expect("subscribe never fails");
-
- let events = events
- .filter(fixtures::filter::messages())
- .take(resume_messages.len())
- .collect::<Vec<_>>()
- .immediately()
- .await;
-
- for message in &resume_messages {
- assert!(events.iter().any(|event| event == message));
- }
-
- let event = events.last().expect("this vec is non-empty");
-
- event.sequence
- };
-
- // Resume after disconnect a second time
- {
- // At this point, we can send on either channel and demonstrate the
- // problem. The resume point should before both of these messages, but
- // after _all_ prior messages.
- let final_messages = [
- fixtures::message::send(&app, &sender, &channel_a, &fixtures::now()).await,
- fixtures::message::send(&app, &sender, &channel_b, &fixtures::now()).await,
- ];
-
- // Third subscription
- let routes::Events(events) = routes::events(
- State(app.clone()),
- subscriber.clone(),
- Some(resume_at.into()),
- Query::default(),
- )
- .await
- .expect("subscribe never fails");
-
- let events = events
- .filter(fixtures::filter::messages())
- .take(final_messages.len())
- .collect::<Vec<_>>()
- .immediately()
- .await;
-
- // This set of messages, in particular, _should not_ include any prior
- // messages from `initial_messages` or `resume_messages`.
- for message in &final_messages {
- assert!(events.iter().any(|event| event == message));
- }
- };
-}
-
-#[tokio::test]
-async fn terminates_on_token_expiry() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let channel = fixtures::channel::create(&app, &fixtures::now()).await;
- let sender = fixtures::login::create(&app).await;
-
- // Subscribe via the endpoint
-
- let subscriber_creds = fixtures::login::create_with_password(&app).await;
- let subscriber =
- fixtures::identity::identity(&app, &subscriber_creds, &fixtures::ancient()).await;
-
- let routes::Events(events) =
- routes::events(State(app.clone()), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
-
- // Verify the resulting stream's behaviour
-
- app.logins()
- .expire(&fixtures::now())
- .await
- .expect("expiring tokens succeeds");
-
- // These should not be delivered.
- let messages = [
- fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await,
- fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await,
- fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await,
- ];
-
- assert!(events
- .filter(|event| future::ready(messages.contains(event)))
- .next()
- .immediately()
- .await
- .is_none());
-}
-
-#[tokio::test]
-async fn terminates_on_logout() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let channel = fixtures::channel::create(&app, &fixtures::now()).await;
- let sender = fixtures::login::create(&app).await;
-
- // Subscribe via the endpoint
-
- let subscriber_creds = fixtures::login::create_with_password(&app).await;
- let subscriber_token =
- fixtures::identity::logged_in(&app, &subscriber_creds, &fixtures::now()).await;
- let subscriber =
- fixtures::identity::from_token(&app, &subscriber_token, &fixtures::now()).await;
-
- let routes::Events(events) = routes::events(
- State(app.clone()),
- subscriber.clone(),
- None,
- Query::default(),
- )
- .await
- .expect("subscribe never fails");
-
- // Verify the resulting stream's behaviour
-
- app.logins()
- .logout(&subscriber.token)
- .await
- .expect("expiring tokens succeeds");
-
- // These should not be delivered.
- let messages = [
- fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await,
- fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await,
- fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await,
- ];
-
- assert!(events
- .filter(|event| future::ready(messages.contains(event)))
- .next()
- .immediately()
- .await
- .is_none());
-}