diff options
| author | Owen Jacobson <owen@grimoire.ca> | 2024-09-27 18:17:02 -0400 |
|---|---|---|
| committer | Owen Jacobson <owen@grimoire.ca> | 2024-09-27 19:59:22 -0400 |
| commit | eff129bc1f29bcb1b2b9d10c6b49ab886edc83d6 (patch) | |
| tree | b82892a6cf40f771998a85e5530012bab80157dc /src/events/routes | |
| parent | 68e3dce3c2e588376c6510783e908941360ac80e (diff) | |
Make `/api/events` a firehose endpoint.
It now includes events for all channels. Clients are responsible for filtering.
The schema for channel events has changed; it now includes a channel name and ID, in the same format as the sender's name and ID. They also now include a `"type"` field, whose only valid value (as of this writing) is `"message"`.
This is groundwork for delivering message deletion (expiry) events to clients, and notifying clients of channel lifecycle events.
Diffstat (limited to 'src/events/routes')
| -rw-r--r-- | src/events/routes/test.rs | 276 |
1 files changed, 66 insertions, 210 deletions
diff --git a/src/events/routes/test.rs b/src/events/routes/test.rs index 4412938..f289225 100644 --- a/src/events/routes/test.rs +++ b/src/events/routes/test.rs @@ -1,40 +1,15 @@ use axum::extract::State; -use axum_extra::extract::Query; use futures::{ future, stream::{self, StreamExt as _}, }; use crate::{ - events::{app, routes}, - repo::channel::{self}, + events::{routes, types}, test::fixtures::{self, future::Immediately as _}, }; #[tokio::test] -async fn no_subscriptions() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let subscriber = fixtures::login::create(&app).await; - - // Call the endpoint - - let subscribed_at = fixtures::now(); - let query = routes::EventsQuery { - channels: [].into(), - }; - let routes::Events(mut events) = - routes::events(State(app), subscribed_at, subscriber, None, Query(query)) - .await - .expect("empty subscription"); - - // Verify the structure of the response. - - assert!(events.next().immediately().await.is_none()); -} - -#[tokio::test] async fn includes_historical_message() { // Set up the environment @@ -47,24 +22,19 @@ async fn includes_historical_message() { let subscriber = fixtures::login::create(&app).await; let subscribed_at = fixtures::now(); - let query = routes::EventsQuery { - channels: [channel.id.clone()].into(), - }; - let routes::Events(mut events) = - routes::events(State(app), subscribed_at, subscriber, None, Query(query)) - .await - .expect("subscribed to valid channel"); + let routes::Events(mut events) = routes::events(State(app), subscribed_at, subscriber, None) + .await + .expect("subscribe never fails"); // Verify the structure of the response. - let routes::ReplayableEvent(_, event) = events + let types::ResumableEvent(_, event) = events .next() .immediately() .await .expect("delivered stored message"); - assert_eq!(channel.id, event.channel); - assert_eq!(message, event.message); + assert_eq!(message, event); } #[tokio::test] @@ -78,68 +48,23 @@ async fn includes_live_message() { let subscriber = fixtures::login::create(&app).await; let subscribed_at = fixtures::now(); - let query = routes::EventsQuery { - channels: [channel.id.clone()].into(), - }; - let routes::Events(mut events) = routes::events( - State(app.clone()), - subscribed_at, - subscriber, - None, - Query(query), - ) - .await - .expect("subscribed to a valid channel"); + let routes::Events(mut events) = + routes::events(State(app.clone()), subscribed_at, subscriber, None) + .await + .expect("subscribe never fails"); // Verify the semantics let sender = fixtures::login::create(&app).await; let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await; - let routes::ReplayableEvent(_, event) = events + let types::ResumableEvent(_, event) = events .next() .immediately() .await .expect("delivered live message"); - assert_eq!(channel.id, event.channel); - assert_eq!(message, event.message); -} - -#[tokio::test] -async fn excludes_other_channels() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let subscribed_channel = fixtures::channel::create(&app).await; - let unsubscribed_channel = fixtures::channel::create(&app).await; - let sender = fixtures::login::create(&app).await; - let message = - fixtures::message::send(&app, &sender, &subscribed_channel, &fixtures::now()).await; - fixtures::message::send(&app, &sender, &unsubscribed_channel, &fixtures::now()).await; - - // Call the endpoint - - let subscriber = fixtures::login::create(&app).await; - let subscribed_at = fixtures::now(); - let query = routes::EventsQuery { - channels: [subscribed_channel.id.clone()].into(), - }; - let routes::Events(mut events) = - routes::events(State(app), subscribed_at, subscriber, None, Query(query)) - .await - .expect("subscribed to a valid channel"); - - // Verify the semantics - - let routes::ReplayableEvent(_, event) = events - .next() - .immediately() - .await - .expect("delivered at least one message"); - - assert_eq!(subscribed_channel.id, event.channel); - assert_eq!(message, event.message); + assert_eq!(message, event); } #[tokio::test] @@ -155,10 +80,11 @@ async fn includes_multiple_channels() { ]; let messages = stream::iter(channels) - .then(|channel| async { - let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await; - - (channel, message) + .then(|channel| { + let app = app.clone(); + let sender = sender.clone(); + let channel = channel.clone(); + async move { fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await } }) .collect::<Vec<_>>() .await; @@ -167,17 +93,9 @@ async fn includes_multiple_channels() { let subscriber = fixtures::login::create(&app).await; let subscribed_at = fixtures::now(); - let query = routes::EventsQuery { - channels: messages - .iter() - .map(|(channel, _)| &channel.id) - .cloned() - .collect(), - }; - let routes::Events(events) = - routes::events(State(app), subscribed_at, subscriber, None, Query(query)) - .await - .expect("subscribed to valid channels"); + let routes::Events(events) = routes::events(State(app), subscribed_at, subscriber, None) + .await + .expect("subscribe never fails"); // Verify the structure of the response. @@ -187,41 +105,14 @@ async fn includes_multiple_channels() { .immediately() .await; - for (channel, message) in messages { - assert!(events.iter().any(|routes::ReplayableEvent(_, event)| { - event.channel == channel.id && event.message == message - })); + for message in &messages { + assert!(events + .iter() + .any(|types::ResumableEvent(_, event)| { event == message })); } } #[tokio::test] -async fn nonexistent_channel() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let channel = channel::Id::generate(); - - // Call the endpoint - - let subscriber = fixtures::login::create(&app).await; - let subscribed_at = fixtures::now(); - let query = routes::EventsQuery { - channels: [channel.clone()].into(), - }; - let routes::ErrorResponse(error) = - routes::events(State(app), subscribed_at, subscriber, None, Query(query)) - .await - .expect_err("subscribed to nonexistent channel"); - - // Verify the structure of the response. - - assert!(matches!( - error, - app::EventsError::ChannelNotFound(error_channel) if error_channel == channel - )); -} - -#[tokio::test] async fn sequential_messages() { // Set up the environment @@ -239,30 +130,24 @@ async fn sequential_messages() { let subscriber = fixtures::login::create(&app).await; let subscribed_at = fixtures::now(); - let query = routes::EventsQuery { - channels: [channel.id.clone()].into(), - }; - let routes::Events(events) = - routes::events(State(app), subscribed_at, subscriber, None, Query(query)) - .await - .expect("subscribed to a valid channel"); + let routes::Events(events) = routes::events(State(app), subscribed_at, subscriber, None) + .await + .expect("subscribe never fails"); // Verify the structure of the response. - let mut events = events.filter(|routes::ReplayableEvent(_, event)| { - future::ready(messages.contains(&event.message)) - }); + let mut events = + events.filter(|types::ResumableEvent(_, event)| future::ready(messages.contains(event))); // Verify delivery in order for message in &messages { - let routes::ReplayableEvent(_, event) = events + let types::ResumableEvent(_, event) = events .next() .immediately() .await .expect("undelivered messages remaining"); - assert_eq!(channel.id, event.channel); - assert_eq!(message, &event.message); + assert_eq!(message, &event); } } @@ -285,42 +170,28 @@ async fn resumes_from() { let subscriber = fixtures::login::create(&app).await; let subscribed_at = fixtures::now(); - let query = routes::EventsQuery { - channels: [channel.id.clone()].into(), - }; let resume_at = { // First subscription - let routes::Events(mut events) = routes::events( - State(app.clone()), - subscribed_at, - subscriber.clone(), - None, - Query(query.clone()), - ) - .await - .expect("subscribed to a valid channel"); + let routes::Events(mut events) = + routes::events(State(app.clone()), subscribed_at, subscriber.clone(), None) + .await + .expect("subscribe never fails"); - let routes::ReplayableEvent(id, event) = + let types::ResumableEvent(last_event_id, event) = events.next().immediately().await.expect("delivered events"); - assert_eq!(channel.id, event.channel); - assert_eq!(initial_message, event.message); + assert_eq!(initial_message, event); - id + last_event_id }; // Resume after disconnect let reconnect_at = fixtures::now(); - let routes::Events(resumed) = routes::events( - State(app), - reconnect_at, - subscriber, - Some(resume_at.into()), - Query(query), - ) - .await - .expect("subscribed to a valid channel"); + let routes::Events(resumed) = + routes::events(State(app), reconnect_at, subscriber, Some(resume_at.into())) + .await + .expect("subscribe never fails"); // Verify the structure of the response. @@ -330,11 +201,10 @@ async fn resumes_from() { .immediately() .await; - for message in later_messages { - assert!(events.iter().any( - |routes::ReplayableEvent(_, event)| event.channel == channel.id - && event.message == message - )); + for message in &later_messages { + assert!(events + .iter() + .any(|types::ResumableEvent(_, event)| event == message)); } } @@ -365,9 +235,6 @@ async fn serial_resume() { // Call the endpoint let subscriber = fixtures::login::create(&app).await; - let query = routes::EventsQuery { - channels: [channel_a.id.clone(), channel_b.id.clone()].into(), - }; let resume_at = { let initial_messages = [ @@ -377,15 +244,10 @@ async fn serial_resume() { // First subscription let subscribed_at = fixtures::now(); - let routes::Events(events) = routes::events( - State(app.clone()), - subscribed_at, - subscriber.clone(), - None, - Query(query.clone()), - ) - .await - .expect("subscribed to a valid channel"); + let routes::Events(events) = + routes::events(State(app.clone()), subscribed_at, subscriber.clone(), None) + .await + .expect("subscribe never fails"); let events = events .take(initial_messages.len()) @@ -393,13 +255,13 @@ async fn serial_resume() { .immediately() .await; - for message in initial_messages { + for message in &initial_messages { assert!(events .iter() - .any(|routes::ReplayableEvent(_, event)| event.message == message)); + .any(|types::ResumableEvent(_, event)| event == message)); } - let routes::ReplayableEvent(id, _) = events.last().expect("this vec is non-empty"); + let types::ResumableEvent(id, _) = events.last().expect("this vec is non-empty"); id.to_owned() }; @@ -421,10 +283,9 @@ async fn serial_resume() { resubscribed_at, subscriber.clone(), Some(resume_at.into()), - Query(query.clone()), ) .await - .expect("subscribed to a valid channel"); + .expect("subscribe never fails"); let events = events .take(resume_messages.len()) @@ -432,13 +293,13 @@ async fn serial_resume() { .immediately() .await; - for message in resume_messages { + for message in &resume_messages { assert!(events .iter() - .any(|routes::ReplayableEvent(_, event)| event.message == message)); + .any(|types::ResumableEvent(_, event)| event == message)); } - let routes::ReplayableEvent(id, _) = events.last().expect("this vec is non-empty"); + let types::ResumableEvent(id, _) = events.last().expect("this vec is non-empty"); id.to_owned() }; @@ -460,10 +321,9 @@ async fn serial_resume() { resubscribed_at, subscriber.clone(), Some(resume_at.into()), - Query(query.clone()), ) .await - .expect("subscribed to a valid channel"); + .expect("subscribe never fails"); let events = events .take(final_messages.len()) @@ -473,10 +333,10 @@ async fn serial_resume() { // This set of messages, in particular, _should not_ include any prior // messages from `initial_messages` or `resume_messages`. - for message in final_messages { + for message in &final_messages { assert!(events .iter() - .any(|routes::ReplayableEvent(_, event)| event.message == message)); + .any(|types::ResumableEvent(_, event)| event == message)); } }; } @@ -495,22 +355,18 @@ async fn removes_expired_messages() { let subscriber = fixtures::login::create(&app).await; let subscribed_at = fixtures::now(); - let query = routes::EventsQuery { - channels: [channel.id.clone()].into(), - }; - let routes::Events(mut events) = - routes::events(State(app), subscribed_at, subscriber, None, Query(query)) - .await - .expect("subscribed to valid channel"); + + let routes::Events(mut events) = routes::events(State(app), subscribed_at, subscriber, None) + .await + .expect("subscribe never fails"); // Verify the semantics - let routes::ReplayableEvent(_, event) = events + let types::ResumableEvent(_, event) = events .next() .immediately() .await .expect("delivered messages"); - assert_eq!(channel.id, event.channel); - assert_eq!(message, event.message); + assert_eq!(message, event); } |
