diff options
Diffstat (limited to 'src/events')
| -rw-r--r-- | src/events/app.rs | 5 | ||||
| -rw-r--r-- | src/events/repo/broadcast.rs | 2 | ||||
| -rw-r--r-- | src/events/routes.rs | 11 | ||||
| -rw-r--r-- | src/events/routes/test.rs | 368 |
4 files changed, 381 insertions, 5 deletions
diff --git a/src/events/app.rs b/src/events/app.rs index c3a027d..99e849e 100644 --- a/src/events/app.rs +++ b/src/events/app.rs @@ -69,7 +69,10 @@ impl Broadcaster { // panic: if ``channel`` has not been previously registered, and was not // part of the initial set of channels. - pub fn listen(&self, channel: &channel::Id) -> impl Stream<Item = broadcast::Message> { + pub fn listen( + &self, + channel: &channel::Id, + ) -> impl Stream<Item = broadcast::Message> + std::fmt::Debug { let rx = self.sender(channel).subscribe(); BroadcastStream::from(rx) diff --git a/src/events/repo/broadcast.rs b/src/events/repo/broadcast.rs index 182203a..bffe991 100644 --- a/src/events/repo/broadcast.rs +++ b/src/events/repo/broadcast.rs @@ -21,7 +21,7 @@ impl<'c> Provider for Transaction<'c, Sqlite> { pub struct Broadcast<'t>(&'t mut SqliteConnection); -#[derive(Clone, Debug, serde::Serialize)] +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] pub struct Message { pub id: message::Id, pub sender: Login, diff --git a/src/events/routes.rs b/src/events/routes.rs index ce5b778..a6bf5d9 100644 --- a/src/events/routes.rs +++ b/src/events/routes.rs @@ -22,11 +22,14 @@ use crate::{ repo::{channel, login::Login}, }; +#[cfg(test)] +mod test; + pub fn router() -> Router<App> { Router::new().route("/api/events", get(events)) } -#[derive(serde::Deserialize)] +#[derive(Clone, serde::Deserialize)] struct EventsQuery { #[serde(default, rename = "channel")] channels: Vec<channel::Id>, @@ -38,7 +41,7 @@ async fn events( _: Login, // requires auth, but doesn't actually care who you are last_event_id: Option<LastEventId>, Query(query): Query<EventsQuery>, -) -> Result<Events<impl Stream<Item = ChannelEvent>>, ErrorResponse> { +) -> Result<Events<impl Stream<Item = ChannelEvent> + std::fmt::Debug>, ErrorResponse> { let resume_at = last_event_id.as_deref(); let streams = stream::iter(query.channels) @@ -64,6 +67,7 @@ async fn events( Ok(Events(stream)) } +#[derive(Debug)] struct Events<S>(S); impl<S> IntoResponse for Events<S> @@ -79,6 +83,7 @@ where } } +#[derive(Debug)] struct ErrorResponse(EventsError); impl IntoResponse for ErrorResponse { @@ -96,7 +101,7 @@ impl IntoResponse for ErrorResponse { } } -#[derive(serde::Serialize)] +#[derive(Debug, serde::Serialize)] struct ChannelEvent { channel: channel::Id, #[serde(flatten)] diff --git a/src/events/routes/test.rs b/src/events/routes/test.rs new file mode 100644 index 0000000..df2d5f6 --- /dev/null +++ b/src/events/routes/test.rs @@ -0,0 +1,368 @@ +use axum::extract::State; +use axum_extra::extract::Query; +use futures::{ + future, + stream::{self, StreamExt as _}, +}; + +use crate::{ + channel::app, + events::routes, + repo::channel::{self}, + test::fixtures::{self, future::Immediately as _}, +}; + +#[tokio::test] +async fn no_subscriptions() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let subscriber = fixtures::login::create(&app).await; + + // Call the endpoint + + let subscribed_at = fixtures::now(); + let query = routes::EventsQuery { channels: vec![] }; + let routes::Events(mut events) = + routes::events(State(app), subscribed_at, subscriber, None, Query(query)) + .await + .expect("empty subscription"); + + // Verify the structure of the response. + + assert!(events.next().immediately().await.is_none()); +} + +#[tokio::test] +async fn includes_historical_message() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let sender = fixtures::login::create(&app).await; + let channel = fixtures::channel::create(&app).await; + let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await; + + // Call the endpoint + + let subscriber = fixtures::login::create(&app).await; + let subscribed_at = fixtures::now(); + let query = routes::EventsQuery { + channels: vec![channel.id.clone()], + }; + let routes::Events(mut events) = + routes::events(State(app), subscribed_at, subscriber, None, Query(query)) + .await + .expect("subscribed to valid channel"); + + // Verify the structure of the response. + + let event = events + .next() + .immediately() + .await + .expect("delivered stored message"); + + assert_eq!(channel.id, event.channel); + assert_eq!(message, event.message); +} + +#[tokio::test] +async fn includes_live_message() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app).await; + + // Call the endpoint + + let subscriber = fixtures::login::create(&app).await; + let subscribed_at = fixtures::now(); + let query = routes::EventsQuery { + channels: vec![channel.id.clone()], + }; + let routes::Events(mut events) = routes::events( + State(app.clone()), + subscribed_at, + subscriber, + None, + Query(query), + ) + .await + .expect("subscribed to a valid channel"); + + // Verify the semantics + + let sender = fixtures::login::create(&app).await; + let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await; + + let event = events + .next() + .immediately() + .await + .expect("delivered live message"); + + assert_eq!(channel.id, event.channel); + assert_eq!(message, event.message); +} + +#[tokio::test] +async fn excludes_other_channels() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let subscribed = fixtures::channel::create(&app).await; + let unsubscribed = fixtures::channel::create(&app).await; + let sender = fixtures::login::create(&app).await; + let message = fixtures::message::send(&app, &sender, &subscribed, &fixtures::now()).await; + fixtures::message::send(&app, &sender, &unsubscribed, &fixtures::now()).await; + + // Call the endpoint + + let subscriber = fixtures::login::create(&app).await; + let subscribed_at = fixtures::now(); + let query = routes::EventsQuery { + channels: vec![subscribed.id.clone()], + }; + let routes::Events(mut events) = + routes::events(State(app), subscribed_at, subscriber, None, Query(query)) + .await + .expect("subscribed to a valid channel"); + + // Verify the semantics + + let event = events + .next() + .immediately() + .await + .expect("delivered at least one message"); + + assert_eq!(subscribed.id, event.channel); + assert_eq!(message, event.message); +} + +#[tokio::test] +async fn includes_multiple_channels() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let sender = fixtures::login::create(&app).await; + + let channels = [ + fixtures::channel::create(&app).await, + fixtures::channel::create(&app).await, + ]; + + let messages = stream::iter(channels) + .then(|channel| async { + let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await; + + (channel, message) + }) + .collect::<Vec<_>>() + .await; + + // Call the endpoint + + let subscriber = fixtures::login::create(&app).await; + let subscribed_at = fixtures::now(); + let query = routes::EventsQuery { + channels: messages + .iter() + .map(|(channel, _)| &channel.id) + .cloned() + .collect(), + }; + let routes::Events(events) = + routes::events(State(app), subscribed_at, subscriber, None, Query(query)) + .await + .expect("subscribed to valid channels"); + + // Verify the structure of the response. + + let events = events + .take(messages.len()) + .collect::<Vec<_>>() + .immediately() + .await; + + for (channel, message) in messages { + assert!(events + .iter() + .any(|event| { event.channel == channel.id && event.message == message })); + } +} + +#[tokio::test] +async fn nonexitent_channel() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = channel::Id::generate(); + + // Call the endpoint + + let subscriber = fixtures::login::create(&app).await; + let subscribed_at = fixtures::now(); + let query = routes::EventsQuery { + channels: vec![channel.clone()], + }; + let routes::ErrorResponse(error) = + routes::events(State(app), subscribed_at, subscriber, None, Query(query)) + .await + .expect_err("subscribed to nonexistent channel"); + + // Verify the structure of the response. + + fixtures::error::expected!( + error, + app::EventsError::ChannelNotFound(error_channel), + assert_eq!(channel, error_channel) + ); +} + +#[tokio::test] +async fn sequential_messages() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app).await; + let sender = fixtures::login::create(&app).await; + + let messages = vec![ + fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await, + fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await, + fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await, + ]; + + // Call the endpoint + + let subscriber = fixtures::login::create(&app).await; + let subscribed_at = fixtures::now(); + let query = routes::EventsQuery { + channels: vec![channel.id.clone()], + }; + let routes::Events(events) = + routes::events(State(app), subscribed_at, subscriber, None, Query(query)) + .await + .expect("subscribed to a valid channel"); + + // Verify the structure of the response. + + let mut events = events.filter(|event| future::ready(messages.contains(&event.message))); + + // Verify delivery in order + for message in &messages { + let event = events + .next() + .immediately() + .await + .expect("undelivered messages remaining"); + + assert_eq!(channel.id, event.channel); + assert_eq!(message, &event.message); + } +} + +#[tokio::test] +async fn resumes_from() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app).await; + let sender = fixtures::login::create(&app).await; + + let initial_message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await; + + let later_messages = vec![ + fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await, + fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await, + ]; + + // Call the endpoint + + let subscriber = fixtures::login::create(&app).await; + let subscribed_at = fixtures::now(); + let query = routes::EventsQuery { + channels: vec![channel.id.clone()], + }; + + let resume_at = { + // First subscription + let routes::Events(mut events) = routes::events( + State(app.clone()), + subscribed_at, + subscriber.clone(), + None, + Query(query.clone()), + ) + .await + .expect("subscribed to a valid channel"); + + let event = events.next().immediately().await.expect("delivered events"); + + assert_eq!(channel.id, event.channel); + assert_eq!(initial_message, event.message); + + event.event_id() + }; + + // Resume after disconnect + let resumed_at = fixtures::now(); + let routes::Events(resumed) = routes::events( + State(app), + resumed_at, + subscriber, + Some(resume_at.into()), + Query(query), + ) + .await + .expect("subscribed to a valid channel"); + + // Verify the structure of the response. + + let events = resumed + .take(later_messages.len()) + .collect::<Vec<_>>() + .immediately() + .await; + + for message in later_messages { + assert!(events + .iter() + .any(|event| event.channel == channel.id && event.message == message)); + } +} + +#[tokio::test] +async fn removes_expired_messages() { + // Set up the environment + let app = fixtures::scratch_app().await; + let sender = fixtures::login::create(&app).await; + let channel = fixtures::channel::create(&app).await; + + fixtures::message::send(&app, &sender, &channel, &fixtures::ancient()).await; + let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await; + + // Call the endpoint + + let subscriber = fixtures::login::create(&app).await; + let subscribed_at = fixtures::now(); + let query = routes::EventsQuery { + channels: vec![channel.id.clone()], + }; + let routes::Events(mut events) = + routes::events(State(app), subscribed_at, subscriber, None, Query(query)) + .await + .expect("subscribed to valid channel"); + + // Verify the semantics + + let event = events + .next() + .immediately() + .await + .expect("delivered messages"); + + assert_eq!(channel.id, event.channel); + assert_eq!(message, event.message); +} |
