summaryrefslogtreecommitdiff
path: root/src/events/routes
diff options
context:
space:
mode:
authorOwen Jacobson <owen@grimoire.ca>2024-09-27 18:17:02 -0400
committerOwen Jacobson <owen@grimoire.ca>2024-09-27 19:59:22 -0400
commiteff129bc1f29bcb1b2b9d10c6b49ab886edc83d6 (patch)
treeb82892a6cf40f771998a85e5530012bab80157dc /src/events/routes
parent68e3dce3c2e588376c6510783e908941360ac80e (diff)
Make `/api/events` a firehose endpoint.
It now includes events for all channels. Clients are responsible for filtering. The schema for channel events has changed; it now includes a channel name and ID, in the same format as the sender's name and ID. They also now include a `"type"` field, whose only valid value (as of this writing) is `"message"`. This is groundwork for delivering message deletion (expiry) events to clients, and notifying clients of channel lifecycle events.
Diffstat (limited to 'src/events/routes')
-rw-r--r--src/events/routes/test.rs276
1 files changed, 66 insertions, 210 deletions
diff --git a/src/events/routes/test.rs b/src/events/routes/test.rs
index 4412938..f289225 100644
--- a/src/events/routes/test.rs
+++ b/src/events/routes/test.rs
@@ -1,40 +1,15 @@
use axum::extract::State;
-use axum_extra::extract::Query;
use futures::{
future,
stream::{self, StreamExt as _},
};
use crate::{
- events::{app, routes},
- repo::channel::{self},
+ events::{routes, types},
test::fixtures::{self, future::Immediately as _},
};
#[tokio::test]
-async fn no_subscriptions() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let subscriber = fixtures::login::create(&app).await;
-
- // Call the endpoint
-
- let subscribed_at = fixtures::now();
- let query = routes::EventsQuery {
- channels: [].into(),
- };
- let routes::Events(mut events) =
- routes::events(State(app), subscribed_at, subscriber, None, Query(query))
- .await
- .expect("empty subscription");
-
- // Verify the structure of the response.
-
- assert!(events.next().immediately().await.is_none());
-}
-
-#[tokio::test]
async fn includes_historical_message() {
// Set up the environment
@@ -47,24 +22,19 @@ async fn includes_historical_message() {
let subscriber = fixtures::login::create(&app).await;
let subscribed_at = fixtures::now();
- let query = routes::EventsQuery {
- channels: [channel.id.clone()].into(),
- };
- let routes::Events(mut events) =
- routes::events(State(app), subscribed_at, subscriber, None, Query(query))
- .await
- .expect("subscribed to valid channel");
+ let routes::Events(mut events) = routes::events(State(app), subscribed_at, subscriber, None)
+ .await
+ .expect("subscribe never fails");
// Verify the structure of the response.
- let routes::ReplayableEvent(_, event) = events
+ let types::ResumableEvent(_, event) = events
.next()
.immediately()
.await
.expect("delivered stored message");
- assert_eq!(channel.id, event.channel);
- assert_eq!(message, event.message);
+ assert_eq!(message, event);
}
#[tokio::test]
@@ -78,68 +48,23 @@ async fn includes_live_message() {
let subscriber = fixtures::login::create(&app).await;
let subscribed_at = fixtures::now();
- let query = routes::EventsQuery {
- channels: [channel.id.clone()].into(),
- };
- let routes::Events(mut events) = routes::events(
- State(app.clone()),
- subscribed_at,
- subscriber,
- None,
- Query(query),
- )
- .await
- .expect("subscribed to a valid channel");
+ let routes::Events(mut events) =
+ routes::events(State(app.clone()), subscribed_at, subscriber, None)
+ .await
+ .expect("subscribe never fails");
// Verify the semantics
let sender = fixtures::login::create(&app).await;
let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await;
- let routes::ReplayableEvent(_, event) = events
+ let types::ResumableEvent(_, event) = events
.next()
.immediately()
.await
.expect("delivered live message");
- assert_eq!(channel.id, event.channel);
- assert_eq!(message, event.message);
-}
-
-#[tokio::test]
-async fn excludes_other_channels() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let subscribed_channel = fixtures::channel::create(&app).await;
- let unsubscribed_channel = fixtures::channel::create(&app).await;
- let sender = fixtures::login::create(&app).await;
- let message =
- fixtures::message::send(&app, &sender, &subscribed_channel, &fixtures::now()).await;
- fixtures::message::send(&app, &sender, &unsubscribed_channel, &fixtures::now()).await;
-
- // Call the endpoint
-
- let subscriber = fixtures::login::create(&app).await;
- let subscribed_at = fixtures::now();
- let query = routes::EventsQuery {
- channels: [subscribed_channel.id.clone()].into(),
- };
- let routes::Events(mut events) =
- routes::events(State(app), subscribed_at, subscriber, None, Query(query))
- .await
- .expect("subscribed to a valid channel");
-
- // Verify the semantics
-
- let routes::ReplayableEvent(_, event) = events
- .next()
- .immediately()
- .await
- .expect("delivered at least one message");
-
- assert_eq!(subscribed_channel.id, event.channel);
- assert_eq!(message, event.message);
+ assert_eq!(message, event);
}
#[tokio::test]
@@ -155,10 +80,11 @@ async fn includes_multiple_channels() {
];
let messages = stream::iter(channels)
- .then(|channel| async {
- let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await;
-
- (channel, message)
+ .then(|channel| {
+ let app = app.clone();
+ let sender = sender.clone();
+ let channel = channel.clone();
+ async move { fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await }
})
.collect::<Vec<_>>()
.await;
@@ -167,17 +93,9 @@ async fn includes_multiple_channels() {
let subscriber = fixtures::login::create(&app).await;
let subscribed_at = fixtures::now();
- let query = routes::EventsQuery {
- channels: messages
- .iter()
- .map(|(channel, _)| &channel.id)
- .cloned()
- .collect(),
- };
- let routes::Events(events) =
- routes::events(State(app), subscribed_at, subscriber, None, Query(query))
- .await
- .expect("subscribed to valid channels");
+ let routes::Events(events) = routes::events(State(app), subscribed_at, subscriber, None)
+ .await
+ .expect("subscribe never fails");
// Verify the structure of the response.
@@ -187,41 +105,14 @@ async fn includes_multiple_channels() {
.immediately()
.await;
- for (channel, message) in messages {
- assert!(events.iter().any(|routes::ReplayableEvent(_, event)| {
- event.channel == channel.id && event.message == message
- }));
+ for message in &messages {
+ assert!(events
+ .iter()
+ .any(|types::ResumableEvent(_, event)| { event == message }));
}
}
#[tokio::test]
-async fn nonexistent_channel() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let channel = channel::Id::generate();
-
- // Call the endpoint
-
- let subscriber = fixtures::login::create(&app).await;
- let subscribed_at = fixtures::now();
- let query = routes::EventsQuery {
- channels: [channel.clone()].into(),
- };
- let routes::ErrorResponse(error) =
- routes::events(State(app), subscribed_at, subscriber, None, Query(query))
- .await
- .expect_err("subscribed to nonexistent channel");
-
- // Verify the structure of the response.
-
- assert!(matches!(
- error,
- app::EventsError::ChannelNotFound(error_channel) if error_channel == channel
- ));
-}
-
-#[tokio::test]
async fn sequential_messages() {
// Set up the environment
@@ -239,30 +130,24 @@ async fn sequential_messages() {
let subscriber = fixtures::login::create(&app).await;
let subscribed_at = fixtures::now();
- let query = routes::EventsQuery {
- channels: [channel.id.clone()].into(),
- };
- let routes::Events(events) =
- routes::events(State(app), subscribed_at, subscriber, None, Query(query))
- .await
- .expect("subscribed to a valid channel");
+ let routes::Events(events) = routes::events(State(app), subscribed_at, subscriber, None)
+ .await
+ .expect("subscribe never fails");
// Verify the structure of the response.
- let mut events = events.filter(|routes::ReplayableEvent(_, event)| {
- future::ready(messages.contains(&event.message))
- });
+ let mut events =
+ events.filter(|types::ResumableEvent(_, event)| future::ready(messages.contains(event)));
// Verify delivery in order
for message in &messages {
- let routes::ReplayableEvent(_, event) = events
+ let types::ResumableEvent(_, event) = events
.next()
.immediately()
.await
.expect("undelivered messages remaining");
- assert_eq!(channel.id, event.channel);
- assert_eq!(message, &event.message);
+ assert_eq!(message, &event);
}
}
@@ -285,42 +170,28 @@ async fn resumes_from() {
let subscriber = fixtures::login::create(&app).await;
let subscribed_at = fixtures::now();
- let query = routes::EventsQuery {
- channels: [channel.id.clone()].into(),
- };
let resume_at = {
// First subscription
- let routes::Events(mut events) = routes::events(
- State(app.clone()),
- subscribed_at,
- subscriber.clone(),
- None,
- Query(query.clone()),
- )
- .await
- .expect("subscribed to a valid channel");
+ let routes::Events(mut events) =
+ routes::events(State(app.clone()), subscribed_at, subscriber.clone(), None)
+ .await
+ .expect("subscribe never fails");
- let routes::ReplayableEvent(id, event) =
+ let types::ResumableEvent(last_event_id, event) =
events.next().immediately().await.expect("delivered events");
- assert_eq!(channel.id, event.channel);
- assert_eq!(initial_message, event.message);
+ assert_eq!(initial_message, event);
- id
+ last_event_id
};
// Resume after disconnect
let reconnect_at = fixtures::now();
- let routes::Events(resumed) = routes::events(
- State(app),
- reconnect_at,
- subscriber,
- Some(resume_at.into()),
- Query(query),
- )
- .await
- .expect("subscribed to a valid channel");
+ let routes::Events(resumed) =
+ routes::events(State(app), reconnect_at, subscriber, Some(resume_at.into()))
+ .await
+ .expect("subscribe never fails");
// Verify the structure of the response.
@@ -330,11 +201,10 @@ async fn resumes_from() {
.immediately()
.await;
- for message in later_messages {
- assert!(events.iter().any(
- |routes::ReplayableEvent(_, event)| event.channel == channel.id
- && event.message == message
- ));
+ for message in &later_messages {
+ assert!(events
+ .iter()
+ .any(|types::ResumableEvent(_, event)| event == message));
}
}
@@ -365,9 +235,6 @@ async fn serial_resume() {
// Call the endpoint
let subscriber = fixtures::login::create(&app).await;
- let query = routes::EventsQuery {
- channels: [channel_a.id.clone(), channel_b.id.clone()].into(),
- };
let resume_at = {
let initial_messages = [
@@ -377,15 +244,10 @@ async fn serial_resume() {
// First subscription
let subscribed_at = fixtures::now();
- let routes::Events(events) = routes::events(
- State(app.clone()),
- subscribed_at,
- subscriber.clone(),
- None,
- Query(query.clone()),
- )
- .await
- .expect("subscribed to a valid channel");
+ let routes::Events(events) =
+ routes::events(State(app.clone()), subscribed_at, subscriber.clone(), None)
+ .await
+ .expect("subscribe never fails");
let events = events
.take(initial_messages.len())
@@ -393,13 +255,13 @@ async fn serial_resume() {
.immediately()
.await;
- for message in initial_messages {
+ for message in &initial_messages {
assert!(events
.iter()
- .any(|routes::ReplayableEvent(_, event)| event.message == message));
+ .any(|types::ResumableEvent(_, event)| event == message));
}
- let routes::ReplayableEvent(id, _) = events.last().expect("this vec is non-empty");
+ let types::ResumableEvent(id, _) = events.last().expect("this vec is non-empty");
id.to_owned()
};
@@ -421,10 +283,9 @@ async fn serial_resume() {
resubscribed_at,
subscriber.clone(),
Some(resume_at.into()),
- Query(query.clone()),
)
.await
- .expect("subscribed to a valid channel");
+ .expect("subscribe never fails");
let events = events
.take(resume_messages.len())
@@ -432,13 +293,13 @@ async fn serial_resume() {
.immediately()
.await;
- for message in resume_messages {
+ for message in &resume_messages {
assert!(events
.iter()
- .any(|routes::ReplayableEvent(_, event)| event.message == message));
+ .any(|types::ResumableEvent(_, event)| event == message));
}
- let routes::ReplayableEvent(id, _) = events.last().expect("this vec is non-empty");
+ let types::ResumableEvent(id, _) = events.last().expect("this vec is non-empty");
id.to_owned()
};
@@ -460,10 +321,9 @@ async fn serial_resume() {
resubscribed_at,
subscriber.clone(),
Some(resume_at.into()),
- Query(query.clone()),
)
.await
- .expect("subscribed to a valid channel");
+ .expect("subscribe never fails");
let events = events
.take(final_messages.len())
@@ -473,10 +333,10 @@ async fn serial_resume() {
// This set of messages, in particular, _should not_ include any prior
// messages from `initial_messages` or `resume_messages`.
- for message in final_messages {
+ for message in &final_messages {
assert!(events
.iter()
- .any(|routes::ReplayableEvent(_, event)| event.message == message));
+ .any(|types::ResumableEvent(_, event)| event == message));
}
};
}
@@ -495,22 +355,18 @@ async fn removes_expired_messages() {
let subscriber = fixtures::login::create(&app).await;
let subscribed_at = fixtures::now();
- let query = routes::EventsQuery {
- channels: [channel.id.clone()].into(),
- };
- let routes::Events(mut events) =
- routes::events(State(app), subscribed_at, subscriber, None, Query(query))
- .await
- .expect("subscribed to valid channel");
+
+ let routes::Events(mut events) = routes::events(State(app), subscribed_at, subscriber, None)
+ .await
+ .expect("subscribe never fails");
// Verify the semantics
- let routes::ReplayableEvent(_, event) = events
+ let types::ResumableEvent(_, event) = events
.next()
.immediately()
.await
.expect("delivered messages");
- assert_eq!(channel.id, event.channel);
- assert_eq!(message, event.message);
+ assert_eq!(message, event);
}