summaryrefslogtreecommitdiff
path: root/src/events/routes/test.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/events/routes/test.rs')
-rw-r--r--src/events/routes/test.rs368
1 files changed, 368 insertions, 0 deletions
diff --git a/src/events/routes/test.rs b/src/events/routes/test.rs
new file mode 100644
index 0000000..df2d5f6
--- /dev/null
+++ b/src/events/routes/test.rs
@@ -0,0 +1,368 @@
+use axum::extract::State;
+use axum_extra::extract::Query;
+use futures::{
+ future,
+ stream::{self, StreamExt as _},
+};
+
+use crate::{
+ channel::app,
+ events::routes,
+ repo::channel::{self},
+ test::fixtures::{self, future::Immediately as _},
+};
+
+#[tokio::test]
+async fn no_subscriptions() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let subscriber = fixtures::login::create(&app).await;
+
+ // Call the endpoint
+
+ let subscribed_at = fixtures::now();
+ let query = routes::EventsQuery { channels: vec![] };
+ let routes::Events(mut events) =
+ routes::events(State(app), subscribed_at, subscriber, None, Query(query))
+ .await
+ .expect("empty subscription");
+
+ // Verify the structure of the response.
+
+ assert!(events.next().immediately().await.is_none());
+}
+
+#[tokio::test]
+async fn includes_historical_message() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let sender = fixtures::login::create(&app).await;
+ let channel = fixtures::channel::create(&app).await;
+ let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await;
+
+ // Call the endpoint
+
+ let subscriber = fixtures::login::create(&app).await;
+ let subscribed_at = fixtures::now();
+ let query = routes::EventsQuery {
+ channels: vec![channel.id.clone()],
+ };
+ let routes::Events(mut events) =
+ routes::events(State(app), subscribed_at, subscriber, None, Query(query))
+ .await
+ .expect("subscribed to valid channel");
+
+ // Verify the structure of the response.
+
+ let event = events
+ .next()
+ .immediately()
+ .await
+ .expect("delivered stored message");
+
+ assert_eq!(channel.id, event.channel);
+ assert_eq!(message, event.message);
+}
+
+#[tokio::test]
+async fn includes_live_message() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app).await;
+
+ // Call the endpoint
+
+ let subscriber = fixtures::login::create(&app).await;
+ let subscribed_at = fixtures::now();
+ let query = routes::EventsQuery {
+ channels: vec![channel.id.clone()],
+ };
+ let routes::Events(mut events) = routes::events(
+ State(app.clone()),
+ subscribed_at,
+ subscriber,
+ None,
+ Query(query),
+ )
+ .await
+ .expect("subscribed to a valid channel");
+
+ // Verify the semantics
+
+ let sender = fixtures::login::create(&app).await;
+ let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await;
+
+ let event = events
+ .next()
+ .immediately()
+ .await
+ .expect("delivered live message");
+
+ assert_eq!(channel.id, event.channel);
+ assert_eq!(message, event.message);
+}
+
+#[tokio::test]
+async fn excludes_other_channels() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let subscribed = fixtures::channel::create(&app).await;
+ let unsubscribed = fixtures::channel::create(&app).await;
+ let sender = fixtures::login::create(&app).await;
+ let message = fixtures::message::send(&app, &sender, &subscribed, &fixtures::now()).await;
+ fixtures::message::send(&app, &sender, &unsubscribed, &fixtures::now()).await;
+
+ // Call the endpoint
+
+ let subscriber = fixtures::login::create(&app).await;
+ let subscribed_at = fixtures::now();
+ let query = routes::EventsQuery {
+ channels: vec![subscribed.id.clone()],
+ };
+ let routes::Events(mut events) =
+ routes::events(State(app), subscribed_at, subscriber, None, Query(query))
+ .await
+ .expect("subscribed to a valid channel");
+
+ // Verify the semantics
+
+ let event = events
+ .next()
+ .immediately()
+ .await
+ .expect("delivered at least one message");
+
+ assert_eq!(subscribed.id, event.channel);
+ assert_eq!(message, event.message);
+}
+
+#[tokio::test]
+async fn includes_multiple_channels() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let sender = fixtures::login::create(&app).await;
+
+ let channels = [
+ fixtures::channel::create(&app).await,
+ fixtures::channel::create(&app).await,
+ ];
+
+ let messages = stream::iter(channels)
+ .then(|channel| async {
+ let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await;
+
+ (channel, message)
+ })
+ .collect::<Vec<_>>()
+ .await;
+
+ // Call the endpoint
+
+ let subscriber = fixtures::login::create(&app).await;
+ let subscribed_at = fixtures::now();
+ let query = routes::EventsQuery {
+ channels: messages
+ .iter()
+ .map(|(channel, _)| &channel.id)
+ .cloned()
+ .collect(),
+ };
+ let routes::Events(events) =
+ routes::events(State(app), subscribed_at, subscriber, None, Query(query))
+ .await
+ .expect("subscribed to valid channels");
+
+ // Verify the structure of the response.
+
+ let events = events
+ .take(messages.len())
+ .collect::<Vec<_>>()
+ .immediately()
+ .await;
+
+ for (channel, message) in messages {
+ assert!(events
+ .iter()
+ .any(|event| { event.channel == channel.id && event.message == message }));
+ }
+}
+
+#[tokio::test]
+async fn nonexitent_channel() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = channel::Id::generate();
+
+ // Call the endpoint
+
+ let subscriber = fixtures::login::create(&app).await;
+ let subscribed_at = fixtures::now();
+ let query = routes::EventsQuery {
+ channels: vec![channel.clone()],
+ };
+ let routes::ErrorResponse(error) =
+ routes::events(State(app), subscribed_at, subscriber, None, Query(query))
+ .await
+ .expect_err("subscribed to nonexistent channel");
+
+ // Verify the structure of the response.
+
+ fixtures::error::expected!(
+ error,
+ app::EventsError::ChannelNotFound(error_channel),
+ assert_eq!(channel, error_channel)
+ );
+}
+
+#[tokio::test]
+async fn sequential_messages() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app).await;
+ let sender = fixtures::login::create(&app).await;
+
+ let messages = vec![
+ fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await,
+ fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await,
+ fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await,
+ ];
+
+ // Call the endpoint
+
+ let subscriber = fixtures::login::create(&app).await;
+ let subscribed_at = fixtures::now();
+ let query = routes::EventsQuery {
+ channels: vec![channel.id.clone()],
+ };
+ let routes::Events(events) =
+ routes::events(State(app), subscribed_at, subscriber, None, Query(query))
+ .await
+ .expect("subscribed to a valid channel");
+
+ // Verify the structure of the response.
+
+ let mut events = events.filter(|event| future::ready(messages.contains(&event.message)));
+
+ // Verify delivery in order
+ for message in &messages {
+ let event = events
+ .next()
+ .immediately()
+ .await
+ .expect("undelivered messages remaining");
+
+ assert_eq!(channel.id, event.channel);
+ assert_eq!(message, &event.message);
+ }
+}
+
+#[tokio::test]
+async fn resumes_from() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app).await;
+ let sender = fixtures::login::create(&app).await;
+
+ let initial_message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await;
+
+ let later_messages = vec![
+ fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await,
+ fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await,
+ ];
+
+ // Call the endpoint
+
+ let subscriber = fixtures::login::create(&app).await;
+ let subscribed_at = fixtures::now();
+ let query = routes::EventsQuery {
+ channels: vec![channel.id.clone()],
+ };
+
+ let resume_at = {
+ // First subscription
+ let routes::Events(mut events) = routes::events(
+ State(app.clone()),
+ subscribed_at,
+ subscriber.clone(),
+ None,
+ Query(query.clone()),
+ )
+ .await
+ .expect("subscribed to a valid channel");
+
+ let event = events.next().immediately().await.expect("delivered events");
+
+ assert_eq!(channel.id, event.channel);
+ assert_eq!(initial_message, event.message);
+
+ event.event_id()
+ };
+
+ // Resume after disconnect
+ let resumed_at = fixtures::now();
+ let routes::Events(resumed) = routes::events(
+ State(app),
+ resumed_at,
+ subscriber,
+ Some(resume_at.into()),
+ Query(query),
+ )
+ .await
+ .expect("subscribed to a valid channel");
+
+ // Verify the structure of the response.
+
+ let events = resumed
+ .take(later_messages.len())
+ .collect::<Vec<_>>()
+ .immediately()
+ .await;
+
+ for message in later_messages {
+ assert!(events
+ .iter()
+ .any(|event| event.channel == channel.id && event.message == message));
+ }
+}
+
+#[tokio::test]
+async fn removes_expired_messages() {
+ // Set up the environment
+ let app = fixtures::scratch_app().await;
+ let sender = fixtures::login::create(&app).await;
+ let channel = fixtures::channel::create(&app).await;
+
+ fixtures::message::send(&app, &sender, &channel, &fixtures::ancient()).await;
+ let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await;
+
+ // Call the endpoint
+
+ let subscriber = fixtures::login::create(&app).await;
+ let subscribed_at = fixtures::now();
+ let query = routes::EventsQuery {
+ channels: vec![channel.id.clone()],
+ };
+ let routes::Events(mut events) =
+ routes::events(State(app), subscribed_at, subscriber, None, Query(query))
+ .await
+ .expect("subscribed to valid channel");
+
+ // Verify the semantics
+
+ let event = events
+ .next()
+ .immediately()
+ .await
+ .expect("delivered messages");
+
+ assert_eq!(channel.id, event.channel);
+ assert_eq!(message, event.message);
+}