summaryrefslogtreecommitdiff
path: root/src/events
diff options
context:
space:
mode:
authorOwen Jacobson <owen@grimoire.ca>2024-09-19 01:25:31 -0400
committerOwen Jacobson <owen@grimoire.ca>2024-09-20 23:55:22 -0400
commite5f72711c5a17c5db24e209b14f82d426eceb86e (patch)
tree04865172284c86549dd08d700c21a29c36f54005 /src/events
parent0079624488af334817f58e30dbc676d3adde8de6 (diff)
Write tests.
Diffstat (limited to 'src/events')
-rw-r--r--src/events/app.rs5
-rw-r--r--src/events/repo/broadcast.rs2
-rw-r--r--src/events/routes.rs11
-rw-r--r--src/events/routes/test.rs368
4 files changed, 381 insertions, 5 deletions
diff --git a/src/events/app.rs b/src/events/app.rs
index c3a027d..99e849e 100644
--- a/src/events/app.rs
+++ b/src/events/app.rs
@@ -69,7 +69,10 @@ impl Broadcaster {
// panic: if ``channel`` has not been previously registered, and was not
// part of the initial set of channels.
- pub fn listen(&self, channel: &channel::Id) -> impl Stream<Item = broadcast::Message> {
+ pub fn listen(
+ &self,
+ channel: &channel::Id,
+ ) -> impl Stream<Item = broadcast::Message> + std::fmt::Debug {
let rx = self.sender(channel).subscribe();
BroadcastStream::from(rx)
diff --git a/src/events/repo/broadcast.rs b/src/events/repo/broadcast.rs
index 182203a..bffe991 100644
--- a/src/events/repo/broadcast.rs
+++ b/src/events/repo/broadcast.rs
@@ -21,7 +21,7 @@ impl<'c> Provider for Transaction<'c, Sqlite> {
pub struct Broadcast<'t>(&'t mut SqliteConnection);
-#[derive(Clone, Debug, serde::Serialize)]
+#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
pub struct Message {
pub id: message::Id,
pub sender: Login,
diff --git a/src/events/routes.rs b/src/events/routes.rs
index ce5b778..a6bf5d9 100644
--- a/src/events/routes.rs
+++ b/src/events/routes.rs
@@ -22,11 +22,14 @@ use crate::{
repo::{channel, login::Login},
};
+#[cfg(test)]
+mod test;
+
pub fn router() -> Router<App> {
Router::new().route("/api/events", get(events))
}
-#[derive(serde::Deserialize)]
+#[derive(Clone, serde::Deserialize)]
struct EventsQuery {
#[serde(default, rename = "channel")]
channels: Vec<channel::Id>,
@@ -38,7 +41,7 @@ async fn events(
_: Login, // requires auth, but doesn't actually care who you are
last_event_id: Option<LastEventId>,
Query(query): Query<EventsQuery>,
-) -> Result<Events<impl Stream<Item = ChannelEvent>>, ErrorResponse> {
+) -> Result<Events<impl Stream<Item = ChannelEvent> + std::fmt::Debug>, ErrorResponse> {
let resume_at = last_event_id.as_deref();
let streams = stream::iter(query.channels)
@@ -64,6 +67,7 @@ async fn events(
Ok(Events(stream))
}
+#[derive(Debug)]
struct Events<S>(S);
impl<S> IntoResponse for Events<S>
@@ -79,6 +83,7 @@ where
}
}
+#[derive(Debug)]
struct ErrorResponse(EventsError);
impl IntoResponse for ErrorResponse {
@@ -96,7 +101,7 @@ impl IntoResponse for ErrorResponse {
}
}
-#[derive(serde::Serialize)]
+#[derive(Debug, serde::Serialize)]
struct ChannelEvent {
channel: channel::Id,
#[serde(flatten)]
diff --git a/src/events/routes/test.rs b/src/events/routes/test.rs
new file mode 100644
index 0000000..df2d5f6
--- /dev/null
+++ b/src/events/routes/test.rs
@@ -0,0 +1,368 @@
+use axum::extract::State;
+use axum_extra::extract::Query;
+use futures::{
+ future,
+ stream::{self, StreamExt as _},
+};
+
+use crate::{
+ channel::app,
+ events::routes,
+ repo::channel::{self},
+ test::fixtures::{self, future::Immediately as _},
+};
+
+#[tokio::test]
+async fn no_subscriptions() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let subscriber = fixtures::login::create(&app).await;
+
+ // Call the endpoint
+
+ let subscribed_at = fixtures::now();
+ let query = routes::EventsQuery { channels: vec![] };
+ let routes::Events(mut events) =
+ routes::events(State(app), subscribed_at, subscriber, None, Query(query))
+ .await
+ .expect("empty subscription");
+
+ // Verify the structure of the response.
+
+ assert!(events.next().immediately().await.is_none());
+}
+
+#[tokio::test]
+async fn includes_historical_message() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let sender = fixtures::login::create(&app).await;
+ let channel = fixtures::channel::create(&app).await;
+ let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await;
+
+ // Call the endpoint
+
+ let subscriber = fixtures::login::create(&app).await;
+ let subscribed_at = fixtures::now();
+ let query = routes::EventsQuery {
+ channels: vec![channel.id.clone()],
+ };
+ let routes::Events(mut events) =
+ routes::events(State(app), subscribed_at, subscriber, None, Query(query))
+ .await
+ .expect("subscribed to valid channel");
+
+ // Verify the structure of the response.
+
+ let event = events
+ .next()
+ .immediately()
+ .await
+ .expect("delivered stored message");
+
+ assert_eq!(channel.id, event.channel);
+ assert_eq!(message, event.message);
+}
+
+#[tokio::test]
+async fn includes_live_message() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app).await;
+
+ // Call the endpoint
+
+ let subscriber = fixtures::login::create(&app).await;
+ let subscribed_at = fixtures::now();
+ let query = routes::EventsQuery {
+ channels: vec![channel.id.clone()],
+ };
+ let routes::Events(mut events) = routes::events(
+ State(app.clone()),
+ subscribed_at,
+ subscriber,
+ None,
+ Query(query),
+ )
+ .await
+ .expect("subscribed to a valid channel");
+
+ // Verify the semantics
+
+ let sender = fixtures::login::create(&app).await;
+ let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await;
+
+ let event = events
+ .next()
+ .immediately()
+ .await
+ .expect("delivered live message");
+
+ assert_eq!(channel.id, event.channel);
+ assert_eq!(message, event.message);
+}
+
+#[tokio::test]
+async fn excludes_other_channels() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let subscribed = fixtures::channel::create(&app).await;
+ let unsubscribed = fixtures::channel::create(&app).await;
+ let sender = fixtures::login::create(&app).await;
+ let message = fixtures::message::send(&app, &sender, &subscribed, &fixtures::now()).await;
+ fixtures::message::send(&app, &sender, &unsubscribed, &fixtures::now()).await;
+
+ // Call the endpoint
+
+ let subscriber = fixtures::login::create(&app).await;
+ let subscribed_at = fixtures::now();
+ let query = routes::EventsQuery {
+ channels: vec![subscribed.id.clone()],
+ };
+ let routes::Events(mut events) =
+ routes::events(State(app), subscribed_at, subscriber, None, Query(query))
+ .await
+ .expect("subscribed to a valid channel");
+
+ // Verify the semantics
+
+ let event = events
+ .next()
+ .immediately()
+ .await
+ .expect("delivered at least one message");
+
+ assert_eq!(subscribed.id, event.channel);
+ assert_eq!(message, event.message);
+}
+
+#[tokio::test]
+async fn includes_multiple_channels() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let sender = fixtures::login::create(&app).await;
+
+ let channels = [
+ fixtures::channel::create(&app).await,
+ fixtures::channel::create(&app).await,
+ ];
+
+ let messages = stream::iter(channels)
+ .then(|channel| async {
+ let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await;
+
+ (channel, message)
+ })
+ .collect::<Vec<_>>()
+ .await;
+
+ // Call the endpoint
+
+ let subscriber = fixtures::login::create(&app).await;
+ let subscribed_at = fixtures::now();
+ let query = routes::EventsQuery {
+ channels: messages
+ .iter()
+ .map(|(channel, _)| &channel.id)
+ .cloned()
+ .collect(),
+ };
+ let routes::Events(events) =
+ routes::events(State(app), subscribed_at, subscriber, None, Query(query))
+ .await
+ .expect("subscribed to valid channels");
+
+ // Verify the structure of the response.
+
+ let events = events
+ .take(messages.len())
+ .collect::<Vec<_>>()
+ .immediately()
+ .await;
+
+ for (channel, message) in messages {
+ assert!(events
+ .iter()
+ .any(|event| { event.channel == channel.id && event.message == message }));
+ }
+}
+
+#[tokio::test]
+async fn nonexitent_channel() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = channel::Id::generate();
+
+ // Call the endpoint
+
+ let subscriber = fixtures::login::create(&app).await;
+ let subscribed_at = fixtures::now();
+ let query = routes::EventsQuery {
+ channels: vec![channel.clone()],
+ };
+ let routes::ErrorResponse(error) =
+ routes::events(State(app), subscribed_at, subscriber, None, Query(query))
+ .await
+ .expect_err("subscribed to nonexistent channel");
+
+ // Verify the structure of the response.
+
+ fixtures::error::expected!(
+ error,
+ app::EventsError::ChannelNotFound(error_channel),
+ assert_eq!(channel, error_channel)
+ );
+}
+
+#[tokio::test]
+async fn sequential_messages() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app).await;
+ let sender = fixtures::login::create(&app).await;
+
+ let messages = vec![
+ fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await,
+ fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await,
+ fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await,
+ ];
+
+ // Call the endpoint
+
+ let subscriber = fixtures::login::create(&app).await;
+ let subscribed_at = fixtures::now();
+ let query = routes::EventsQuery {
+ channels: vec![channel.id.clone()],
+ };
+ let routes::Events(events) =
+ routes::events(State(app), subscribed_at, subscriber, None, Query(query))
+ .await
+ .expect("subscribed to a valid channel");
+
+ // Verify the structure of the response.
+
+ let mut events = events.filter(|event| future::ready(messages.contains(&event.message)));
+
+ // Verify delivery in order
+ for message in &messages {
+ let event = events
+ .next()
+ .immediately()
+ .await
+ .expect("undelivered messages remaining");
+
+ assert_eq!(channel.id, event.channel);
+ assert_eq!(message, &event.message);
+ }
+}
+
+#[tokio::test]
+async fn resumes_from() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app).await;
+ let sender = fixtures::login::create(&app).await;
+
+ let initial_message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await;
+
+ let later_messages = vec![
+ fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await,
+ fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await,
+ ];
+
+ // Call the endpoint
+
+ let subscriber = fixtures::login::create(&app).await;
+ let subscribed_at = fixtures::now();
+ let query = routes::EventsQuery {
+ channels: vec![channel.id.clone()],
+ };
+
+ let resume_at = {
+ // First subscription
+ let routes::Events(mut events) = routes::events(
+ State(app.clone()),
+ subscribed_at,
+ subscriber.clone(),
+ None,
+ Query(query.clone()),
+ )
+ .await
+ .expect("subscribed to a valid channel");
+
+ let event = events.next().immediately().await.expect("delivered events");
+
+ assert_eq!(channel.id, event.channel);
+ assert_eq!(initial_message, event.message);
+
+ event.event_id()
+ };
+
+ // Resume after disconnect
+ let resumed_at = fixtures::now();
+ let routes::Events(resumed) = routes::events(
+ State(app),
+ resumed_at,
+ subscriber,
+ Some(resume_at.into()),
+ Query(query),
+ )
+ .await
+ .expect("subscribed to a valid channel");
+
+ // Verify the structure of the response.
+
+ let events = resumed
+ .take(later_messages.len())
+ .collect::<Vec<_>>()
+ .immediately()
+ .await;
+
+ for message in later_messages {
+ assert!(events
+ .iter()
+ .any(|event| event.channel == channel.id && event.message == message));
+ }
+}
+
+#[tokio::test]
+async fn removes_expired_messages() {
+ // Set up the environment
+ let app = fixtures::scratch_app().await;
+ let sender = fixtures::login::create(&app).await;
+ let channel = fixtures::channel::create(&app).await;
+
+ fixtures::message::send(&app, &sender, &channel, &fixtures::ancient()).await;
+ let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await;
+
+ // Call the endpoint
+
+ let subscriber = fixtures::login::create(&app).await;
+ let subscribed_at = fixtures::now();
+ let query = routes::EventsQuery {
+ channels: vec![channel.id.clone()],
+ };
+ let routes::Events(mut events) =
+ routes::events(State(app), subscribed_at, subscriber, None, Query(query))
+ .await
+ .expect("subscribed to valid channel");
+
+ // Verify the semantics
+
+ let event = events
+ .next()
+ .immediately()
+ .await
+ .expect("delivered messages");
+
+ assert_eq!(channel.id, event.channel);
+ assert_eq!(message, event.message);
+}