From 60b711c844f8624348d5d1dac3a625532a8e2a82 Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Fri, 27 Sep 2024 23:03:46 -0400 Subject: Delete expired messages out of band. Trying to reliably do expiry mid-request was causing some anomalies: * Creating a channel with a dup name would fail, then succeed after listing channels. It was very hard to reason about which operations needed to trigger expiry, to fix this "correctly," so now expiry runs on every request. --- src/events/routes/test.rs | 91 +++++++++++++++-------------------------------- 1 file changed, 28 insertions(+), 63 deletions(-) (limited to 'src/events/routes') diff --git a/src/events/routes/test.rs b/src/events/routes/test.rs index 55ada95..a6e2275 100644 --- a/src/events/routes/test.rs +++ b/src/events/routes/test.rs @@ -21,14 +21,14 @@ async fn includes_historical_message() { // Call the endpoint let subscriber = fixtures::login::create(&app).await; - let subscribed_at = fixtures::now(); - let routes::Events(mut events) = routes::events(State(app), subscribed_at, subscriber, None) + let routes::Events(events) = routes::events(State(app), subscriber, None) .await .expect("subscribe never fails"); // Verify the structure of the response. let types::ResumableEvent(_, event) = events + .filter(fixtures::filter::messages()) .next() .immediately() .await @@ -47,11 +47,9 @@ async fn includes_live_message() { // Call the endpoint let subscriber = fixtures::login::create(&app).await; - let subscribed_at = fixtures::now(); - let routes::Events(mut events) = - routes::events(State(app.clone()), subscribed_at, subscriber, None) - .await - .expect("subscribe never fails"); + let routes::Events(events) = routes::events(State(app.clone()), subscriber, None) + .await + .expect("subscribe never fails"); // Verify the semantics @@ -59,6 +57,7 @@ async fn includes_live_message() { let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await; let types::ResumableEvent(_, event) = events + .filter(fixtures::filter::messages()) .next() .immediately() .await @@ -92,14 +91,14 @@ async fn includes_multiple_channels() { // Call the endpoint let subscriber = fixtures::login::create(&app).await; - let subscribed_at = fixtures::now(); - let routes::Events(events) = routes::events(State(app), subscribed_at, subscriber, None) + let routes::Events(events) = routes::events(State(app), subscriber, None) .await .expect("subscribe never fails"); // Verify the structure of the response. let events = events + .filter(fixtures::filter::messages()) .take(messages.len()) .collect::>() .immediately() @@ -129,8 +128,7 @@ async fn sequential_messages() { // Call the endpoint let subscriber = fixtures::login::create(&app).await; - let subscribed_at = fixtures::now(); - let routes::Events(events) = routes::events(State(app), subscribed_at, subscriber, None) + let routes::Events(events) = routes::events(State(app), subscriber, None) .await .expect("subscribe never fails"); @@ -169,17 +167,19 @@ async fn resumes_from() { // Call the endpoint let subscriber = fixtures::login::create(&app).await; - let subscribed_at = fixtures::now(); let resume_at = { // First subscription - let routes::Events(mut events) = - routes::events(State(app.clone()), subscribed_at, subscriber.clone(), None) - .await - .expect("subscribe never fails"); + let routes::Events(events) = routes::events(State(app.clone()), subscriber.clone(), None) + .await + .expect("subscribe never fails"); - let types::ResumableEvent(last_event_id, event) = - events.next().immediately().await.expect("delivered events"); + let types::ResumableEvent(last_event_id, event) = events + .filter(fixtures::filter::messages()) + .next() + .immediately() + .await + .expect("delivered events"); assert_eq!(initial_message, event); @@ -187,11 +187,9 @@ async fn resumes_from() { }; // Resume after disconnect - let reconnect_at = fixtures::now(); - let routes::Events(resumed) = - routes::events(State(app), reconnect_at, subscriber, Some(resume_at.into())) - .await - .expect("subscribe never fails"); + let routes::Events(resumed) = routes::events(State(app), subscriber, Some(resume_at.into())) + .await + .expect("subscribe never fails"); // Verify the structure of the response. @@ -243,13 +241,12 @@ async fn serial_resume() { ]; // First subscription - let subscribed_at = fixtures::now(); - let routes::Events(events) = - routes::events(State(app.clone()), subscribed_at, subscriber.clone(), None) - .await - .expect("subscribe never fails"); + let routes::Events(events) = routes::events(State(app.clone()), subscriber.clone(), None) + .await + .expect("subscribe never fails"); let events = events + .filter(fixtures::filter::messages()) .take(initial_messages.len()) .collect::>() .immediately() @@ -277,10 +274,8 @@ async fn serial_resume() { ]; // Second subscription - let resubscribed_at = fixtures::now(); let routes::Events(events) = routes::events( State(app.clone()), - resubscribed_at, subscriber.clone(), Some(resume_at.into()), ) @@ -288,6 +283,7 @@ async fn serial_resume() { .expect("subscribe never fails"); let events = events + .filter(fixtures::filter::messages()) .take(resume_messages.len()) .collect::>() .immediately() @@ -314,11 +310,9 @@ async fn serial_resume() { fixtures::message::send(&app, &sender, &channel_b, &fixtures::now()).await, ]; - // Second subscription - let resubscribed_at = fixtures::now(); + // Third subscription let routes::Events(events) = routes::events( State(app.clone()), - resubscribed_at, subscriber.clone(), Some(resume_at.into()), ) @@ -326,6 +320,7 @@ async fn serial_resume() { .expect("subscribe never fails"); let events = events + .filter(fixtures::filter::messages()) .take(final_messages.len()) .collect::>() .immediately() @@ -340,33 +335,3 @@ async fn serial_resume() { } }; } - -#[tokio::test] -async fn removes_expired_messages() { - // Set up the environment - let app = fixtures::scratch_app().await; - let sender = fixtures::login::create(&app).await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; - - fixtures::message::send(&app, &sender, &channel, &fixtures::ancient()).await; - let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await; - - // Call the endpoint - - let subscriber = fixtures::login::create(&app).await; - let subscribed_at = fixtures::now(); - - let routes::Events(mut events) = routes::events(State(app), subscribed_at, subscriber, None) - .await - .expect("subscribe never fails"); - - // Verify the semantics - - let types::ResumableEvent(_, event) = events - .next() - .immediately() - .await - .expect("delivered messages"); - - assert_eq!(message, event); -} -- cgit v1.2.3