summaryrefslogtreecommitdiff
path: root/src/events/routes/test.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/events/routes/test.rs')
-rw-r--r--src/events/routes/test.rs197
1 files changed, 173 insertions, 24 deletions
diff --git a/src/events/routes/test.rs b/src/events/routes/test.rs
index df2d5f6..131c751 100644
--- a/src/events/routes/test.rs
+++ b/src/events/routes/test.rs
@@ -22,7 +22,9 @@ async fn no_subscriptions() {
// Call the endpoint
let subscribed_at = fixtures::now();
- let query = routes::EventsQuery { channels: vec![] };
+ let query = routes::EventsQuery {
+ channels: [].into(),
+ };
let routes::Events(mut events) =
routes::events(State(app), subscribed_at, subscriber, None, Query(query))
.await
@@ -47,7 +49,7 @@ async fn includes_historical_message() {
let subscriber = fixtures::login::create(&app).await;
let subscribed_at = fixtures::now();
let query = routes::EventsQuery {
- channels: vec![channel.id.clone()],
+ channels: [channel.id.clone()].into(),
};
let routes::Events(mut events) =
routes::events(State(app), subscribed_at, subscriber, None, Query(query))
@@ -56,7 +58,7 @@ async fn includes_historical_message() {
// Verify the structure of the response.
- let event = events
+ let routes::ReplayableEvent(_, event) = events
.next()
.immediately()
.await
@@ -78,7 +80,7 @@ async fn includes_live_message() {
let subscriber = fixtures::login::create(&app).await;
let subscribed_at = fixtures::now();
let query = routes::EventsQuery {
- channels: vec![channel.id.clone()],
+ channels: [channel.id.clone()].into(),
};
let routes::Events(mut events) = routes::events(
State(app.clone()),
@@ -95,7 +97,7 @@ async fn includes_live_message() {
let sender = fixtures::login::create(&app).await;
let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await;
- let event = events
+ let routes::ReplayableEvent(_, event) = events
.next()
.immediately()
.await
@@ -121,7 +123,7 @@ async fn excludes_other_channels() {
let subscriber = fixtures::login::create(&app).await;
let subscribed_at = fixtures::now();
let query = routes::EventsQuery {
- channels: vec![subscribed.id.clone()],
+ channels: [subscribed.id.clone()].into(),
};
let routes::Events(mut events) =
routes::events(State(app), subscribed_at, subscriber, None, Query(query))
@@ -130,7 +132,7 @@ async fn excludes_other_channels() {
// Verify the semantics
- let event = events
+ let routes::ReplayableEvent(_, event) = events
.next()
.immediately()
.await
@@ -186,9 +188,9 @@ async fn includes_multiple_channels() {
.await;
for (channel, message) in messages {
- assert!(events
- .iter()
- .any(|event| { event.channel == channel.id && event.message == message }));
+ assert!(events.iter().any(|routes::ReplayableEvent(_, event)| {
+ event.channel == channel.id && event.message == message
+ }));
}
}
@@ -204,7 +206,7 @@ async fn nonexitent_channel() {
let subscriber = fixtures::login::create(&app).await;
let subscribed_at = fixtures::now();
let query = routes::EventsQuery {
- channels: vec![channel.clone()],
+ channels: [channel.clone()].into(),
};
let routes::ErrorResponse(error) =
routes::events(State(app), subscribed_at, subscriber, None, Query(query))
@@ -239,7 +241,7 @@ async fn sequential_messages() {
let subscriber = fixtures::login::create(&app).await;
let subscribed_at = fixtures::now();
let query = routes::EventsQuery {
- channels: vec![channel.id.clone()],
+ channels: [channel.id.clone()].into(),
};
let routes::Events(events) =
routes::events(State(app), subscribed_at, subscriber, None, Query(query))
@@ -248,11 +250,13 @@ async fn sequential_messages() {
// Verify the structure of the response.
- let mut events = events.filter(|event| future::ready(messages.contains(&event.message)));
+ let mut events = events.filter(|routes::ReplayableEvent(_, event)| {
+ future::ready(messages.contains(&event.message))
+ });
// Verify delivery in order
for message in &messages {
- let event = events
+ let routes::ReplayableEvent(_, event) = events
.next()
.immediately()
.await
@@ -283,7 +287,7 @@ async fn resumes_from() {
let subscriber = fixtures::login::create(&app).await;
let subscribed_at = fixtures::now();
let query = routes::EventsQuery {
- channels: vec![channel.id.clone()],
+ channels: [channel.id.clone()].into(),
};
let resume_at = {
@@ -298,19 +302,20 @@ async fn resumes_from() {
.await
.expect("subscribed to a valid channel");
- let event = events.next().immediately().await.expect("delivered events");
+ let routes::ReplayableEvent(id, event) =
+ events.next().immediately().await.expect("delivered events");
assert_eq!(channel.id, event.channel);
assert_eq!(initial_message, event.message);
- event.event_id()
+ id
};
// Resume after disconnect
- let resumed_at = fixtures::now();
+ let reconnect_at = fixtures::now();
let routes::Events(resumed) = routes::events(
State(app),
- resumed_at,
+ reconnect_at,
subscriber,
Some(resume_at.into()),
Query(query),
@@ -327,12 +332,156 @@ async fn resumes_from() {
.await;
for message in later_messages {
- assert!(events
- .iter()
- .any(|event| event.channel == channel.id && event.message == message));
+ assert!(events.iter().any(
+ |routes::ReplayableEvent(_, event)| event.channel == channel.id
+ && event.message == message
+ ));
}
}
+// This test verifies a real bug I hit developing the vector-of-sequences
+// approach to resuming events. A small omission caused the event IDs in a
+// resumed stream to _omit_ channels that were in the original stream until
+// those channels also appeared in the resumed stream.
+//
+// Clients would see something like
+// * In the original stream, Cfoo=5,Cbar=8
+// * In the resumed stream, Cfoo=6 (no Cbar sequence number)
+//
+// Disconnecting and reconnecting a second time, using event IDs from that
+// initial period of the first resume attempt, would then cause the second
+// resume attempt to restart all other channels from the beginning, and not
+// from where the first disconnection happened.
+//
+// This is a real and valid behaviour for clients!
+#[tokio::test]
+async fn serial_resume() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let sender = fixtures::login::create(&app).await;
+ let channel_a = fixtures::channel::create(&app).await;
+ let channel_b = fixtures::channel::create(&app).await;
+
+ // Call the endpoint
+
+ let subscriber = fixtures::login::create(&app).await;
+ let query = routes::EventsQuery {
+ channels: [channel_a.id.clone(), channel_b.id.clone()].into(),
+ };
+
+ let resume_at = {
+ let initial_messages = [
+ fixtures::message::send(&app, &sender, &channel_a, &fixtures::now()).await,
+ fixtures::message::send(&app, &sender, &channel_b, &fixtures::now()).await,
+ ];
+
+ // First subscription
+ let subscribed_at = fixtures::now();
+ let routes::Events(events) = routes::events(
+ State(app.clone()),
+ subscribed_at,
+ subscriber.clone(),
+ None,
+ Query(query.clone()),
+ )
+ .await
+ .expect("subscribed to a valid channel");
+
+ let events = events
+ .take(initial_messages.len())
+ .collect::<Vec<_>>()
+ .immediately()
+ .await;
+
+ for message in initial_messages {
+ assert!(events
+ .iter()
+ .any(|routes::ReplayableEvent(_, event)| event.message == message));
+ }
+
+ let routes::ReplayableEvent(id, _) = events.last().expect("this vec is non-empty");
+
+ id.to_owned()
+ };
+
+ // Resume after disconnect
+ let resume_at = {
+ let resume_messages = [
+ // Note that channel_b does not appear here. The buggy behaviour
+ // would be masked if channel_b happened to send a new message
+ // into the resumed event stream.
+ fixtures::message::send(&app, &sender, &channel_a, &fixtures::now()).await,
+ fixtures::message::send(&app, &sender, &channel_a, &fixtures::now()).await,
+ ];
+
+ // Second subscription
+ let resubscribed_at = fixtures::now();
+ let routes::Events(events) = routes::events(
+ State(app.clone()),
+ resubscribed_at,
+ subscriber.clone(),
+ Some(resume_at.into()),
+ Query(query.clone()),
+ )
+ .await
+ .expect("subscribed to a valid channel");
+
+ let events = events
+ .take(resume_messages.len())
+ .collect::<Vec<_>>()
+ .immediately()
+ .await;
+
+ for message in resume_messages {
+ assert!(events
+ .iter()
+ .any(|routes::ReplayableEvent(_, event)| event.message == message));
+ }
+
+ let routes::ReplayableEvent(id, _) = events.last().expect("this vec is non-empty");
+
+ id.to_owned()
+ };
+
+ // Resume after disconnect a second time
+ {
+ // At this point, we can send on either channel and demonstrate the
+ // problem. The resume point should before both of these messages, but
+ // after _all_ prior messages.
+ let final_messages = [
+ fixtures::message::send(&app, &sender, &channel_a, &fixtures::now()).await,
+ fixtures::message::send(&app, &sender, &channel_b, &fixtures::now()).await,
+ ];
+
+ // Second subscription
+ let resubscribed_at = fixtures::now();
+ let routes::Events(events) = routes::events(
+ State(app.clone()),
+ resubscribed_at,
+ subscriber.clone(),
+ Some(resume_at.into()),
+ Query(query.clone()),
+ )
+ .await
+ .expect("subscribed to a valid channel");
+
+ let events = events
+ .take(final_messages.len())
+ .collect::<Vec<_>>()
+ .immediately()
+ .await;
+
+ // This set of messages, in particular, _should not_ include any prior
+ // messages from `initial_messages` or `resume_messages`.
+ for message in final_messages {
+ assert!(events
+ .iter()
+ .any(|routes::ReplayableEvent(_, event)| event.message == message));
+ }
+ };
+}
+
#[tokio::test]
async fn removes_expired_messages() {
// Set up the environment
@@ -348,7 +497,7 @@ async fn removes_expired_messages() {
let subscriber = fixtures::login::create(&app).await;
let subscribed_at = fixtures::now();
let query = routes::EventsQuery {
- channels: vec![channel.id.clone()],
+ channels: [channel.id.clone()].into_iter().collect(),
};
let routes::Events(mut events) =
routes::events(State(app), subscribed_at, subscriber, None, Query(query))
@@ -357,7 +506,7 @@ async fn removes_expired_messages() {
// Verify the semantics
- let event = events
+ let routes::ReplayableEvent(_, event) = events
.next()
.immediately()
.await