summaryrefslogtreecommitdiff
path: root/src/events/routes
diff options
context:
space:
mode:
authorOwen Jacobson <owen@grimoire.ca>2024-09-27 23:03:46 -0400
committerOwen Jacobson <owen@grimoire.ca>2024-09-28 01:00:12 -0400
commit60b711c844f8624348d5d1dac3a625532a8e2a82 (patch)
treea667cfa3833046425a87ec03c700d6124af70e4e /src/events/routes
parent08c3a6e77a3f61ffc9643a5e1f840df9078d0b36 (diff)
Delete expired messages out of band.
Trying to reliably do expiry mid-request was causing some anomalies: * Creating a channel with a dup name would fail, then succeed after listing channels. It was very hard to reason about which operations needed to trigger expiry, to fix this "correctly," so now expiry runs on every request.
Diffstat (limited to 'src/events/routes')
-rw-r--r--src/events/routes/test.rs91
1 files changed, 28 insertions, 63 deletions
diff --git a/src/events/routes/test.rs b/src/events/routes/test.rs
index 55ada95..a6e2275 100644
--- a/src/events/routes/test.rs
+++ b/src/events/routes/test.rs
@@ -21,14 +21,14 @@ async fn includes_historical_message() {
// Call the endpoint
let subscriber = fixtures::login::create(&app).await;
- let subscribed_at = fixtures::now();
- let routes::Events(mut events) = routes::events(State(app), subscribed_at, subscriber, None)
+ let routes::Events(events) = routes::events(State(app), subscriber, None)
.await
.expect("subscribe never fails");
// Verify the structure of the response.
let types::ResumableEvent(_, event) = events
+ .filter(fixtures::filter::messages())
.next()
.immediately()
.await
@@ -47,11 +47,9 @@ async fn includes_live_message() {
// Call the endpoint
let subscriber = fixtures::login::create(&app).await;
- let subscribed_at = fixtures::now();
- let routes::Events(mut events) =
- routes::events(State(app.clone()), subscribed_at, subscriber, None)
- .await
- .expect("subscribe never fails");
+ let routes::Events(events) = routes::events(State(app.clone()), subscriber, None)
+ .await
+ .expect("subscribe never fails");
// Verify the semantics
@@ -59,6 +57,7 @@ async fn includes_live_message() {
let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await;
let types::ResumableEvent(_, event) = events
+ .filter(fixtures::filter::messages())
.next()
.immediately()
.await
@@ -92,14 +91,14 @@ async fn includes_multiple_channels() {
// Call the endpoint
let subscriber = fixtures::login::create(&app).await;
- let subscribed_at = fixtures::now();
- let routes::Events(events) = routes::events(State(app), subscribed_at, subscriber, None)
+ let routes::Events(events) = routes::events(State(app), subscriber, None)
.await
.expect("subscribe never fails");
// Verify the structure of the response.
let events = events
+ .filter(fixtures::filter::messages())
.take(messages.len())
.collect::<Vec<_>>()
.immediately()
@@ -129,8 +128,7 @@ async fn sequential_messages() {
// Call the endpoint
let subscriber = fixtures::login::create(&app).await;
- let subscribed_at = fixtures::now();
- let routes::Events(events) = routes::events(State(app), subscribed_at, subscriber, None)
+ let routes::Events(events) = routes::events(State(app), subscriber, None)
.await
.expect("subscribe never fails");
@@ -169,17 +167,19 @@ async fn resumes_from() {
// Call the endpoint
let subscriber = fixtures::login::create(&app).await;
- let subscribed_at = fixtures::now();
let resume_at = {
// First subscription
- let routes::Events(mut events) =
- routes::events(State(app.clone()), subscribed_at, subscriber.clone(), None)
- .await
- .expect("subscribe never fails");
+ let routes::Events(events) = routes::events(State(app.clone()), subscriber.clone(), None)
+ .await
+ .expect("subscribe never fails");
- let types::ResumableEvent(last_event_id, event) =
- events.next().immediately().await.expect("delivered events");
+ let types::ResumableEvent(last_event_id, event) = events
+ .filter(fixtures::filter::messages())
+ .next()
+ .immediately()
+ .await
+ .expect("delivered events");
assert_eq!(initial_message, event);
@@ -187,11 +187,9 @@ async fn resumes_from() {
};
// Resume after disconnect
- let reconnect_at = fixtures::now();
- let routes::Events(resumed) =
- routes::events(State(app), reconnect_at, subscriber, Some(resume_at.into()))
- .await
- .expect("subscribe never fails");
+ let routes::Events(resumed) = routes::events(State(app), subscriber, Some(resume_at.into()))
+ .await
+ .expect("subscribe never fails");
// Verify the structure of the response.
@@ -243,13 +241,12 @@ async fn serial_resume() {
];
// First subscription
- let subscribed_at = fixtures::now();
- let routes::Events(events) =
- routes::events(State(app.clone()), subscribed_at, subscriber.clone(), None)
- .await
- .expect("subscribe never fails");
+ let routes::Events(events) = routes::events(State(app.clone()), subscriber.clone(), None)
+ .await
+ .expect("subscribe never fails");
let events = events
+ .filter(fixtures::filter::messages())
.take(initial_messages.len())
.collect::<Vec<_>>()
.immediately()
@@ -277,10 +274,8 @@ async fn serial_resume() {
];
// Second subscription
- let resubscribed_at = fixtures::now();
let routes::Events(events) = routes::events(
State(app.clone()),
- resubscribed_at,
subscriber.clone(),
Some(resume_at.into()),
)
@@ -288,6 +283,7 @@ async fn serial_resume() {
.expect("subscribe never fails");
let events = events
+ .filter(fixtures::filter::messages())
.take(resume_messages.len())
.collect::<Vec<_>>()
.immediately()
@@ -314,11 +310,9 @@ async fn serial_resume() {
fixtures::message::send(&app, &sender, &channel_b, &fixtures::now()).await,
];
- // Second subscription
- let resubscribed_at = fixtures::now();
+ // Third subscription
let routes::Events(events) = routes::events(
State(app.clone()),
- resubscribed_at,
subscriber.clone(),
Some(resume_at.into()),
)
@@ -326,6 +320,7 @@ async fn serial_resume() {
.expect("subscribe never fails");
let events = events
+ .filter(fixtures::filter::messages())
.take(final_messages.len())
.collect::<Vec<_>>()
.immediately()
@@ -340,33 +335,3 @@ async fn serial_resume() {
}
};
}
-
-#[tokio::test]
-async fn removes_expired_messages() {
- // Set up the environment
- let app = fixtures::scratch_app().await;
- let sender = fixtures::login::create(&app).await;
- let channel = fixtures::channel::create(&app, &fixtures::now()).await;
-
- fixtures::message::send(&app, &sender, &channel, &fixtures::ancient()).await;
- let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await;
-
- // Call the endpoint
-
- let subscriber = fixtures::login::create(&app).await;
- let subscribed_at = fixtures::now();
-
- let routes::Events(mut events) = routes::events(State(app), subscribed_at, subscriber, None)
- .await
- .expect("subscribe never fails");
-
- // Verify the semantics
-
- let types::ResumableEvent(_, event) = events
- .next()
- .immediately()
- .await
- .expect("delivered messages");
-
- assert_eq!(message, event);
-}