summaryrefslogtreecommitdiff
path: root/src/events/routes/test.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/events/routes/test.rs')
-rw-r--r--src/events/routes/test.rs91
1 files changed, 28 insertions, 63 deletions
diff --git a/src/events/routes/test.rs b/src/events/routes/test.rs
index 55ada95..a6e2275 100644
--- a/src/events/routes/test.rs
+++ b/src/events/routes/test.rs
@@ -21,14 +21,14 @@ async fn includes_historical_message() {
// Call the endpoint
let subscriber = fixtures::login::create(&app).await;
- let subscribed_at = fixtures::now();
- let routes::Events(mut events) = routes::events(State(app), subscribed_at, subscriber, None)
+ let routes::Events(events) = routes::events(State(app), subscriber, None)
.await
.expect("subscribe never fails");
// Verify the structure of the response.
let types::ResumableEvent(_, event) = events
+ .filter(fixtures::filter::messages())
.next()
.immediately()
.await
@@ -47,11 +47,9 @@ async fn includes_live_message() {
// Call the endpoint
let subscriber = fixtures::login::create(&app).await;
- let subscribed_at = fixtures::now();
- let routes::Events(mut events) =
- routes::events(State(app.clone()), subscribed_at, subscriber, None)
- .await
- .expect("subscribe never fails");
+ let routes::Events(events) = routes::events(State(app.clone()), subscriber, None)
+ .await
+ .expect("subscribe never fails");
// Verify the semantics
@@ -59,6 +57,7 @@ async fn includes_live_message() {
let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await;
let types::ResumableEvent(_, event) = events
+ .filter(fixtures::filter::messages())
.next()
.immediately()
.await
@@ -92,14 +91,14 @@ async fn includes_multiple_channels() {
// Call the endpoint
let subscriber = fixtures::login::create(&app).await;
- let subscribed_at = fixtures::now();
- let routes::Events(events) = routes::events(State(app), subscribed_at, subscriber, None)
+ let routes::Events(events) = routes::events(State(app), subscriber, None)
.await
.expect("subscribe never fails");
// Verify the structure of the response.
let events = events
+ .filter(fixtures::filter::messages())
.take(messages.len())
.collect::<Vec<_>>()
.immediately()
@@ -129,8 +128,7 @@ async fn sequential_messages() {
// Call the endpoint
let subscriber = fixtures::login::create(&app).await;
- let subscribed_at = fixtures::now();
- let routes::Events(events) = routes::events(State(app), subscribed_at, subscriber, None)
+ let routes::Events(events) = routes::events(State(app), subscriber, None)
.await
.expect("subscribe never fails");
@@ -169,17 +167,19 @@ async fn resumes_from() {
// Call the endpoint
let subscriber = fixtures::login::create(&app).await;
- let subscribed_at = fixtures::now();
let resume_at = {
// First subscription
- let routes::Events(mut events) =
- routes::events(State(app.clone()), subscribed_at, subscriber.clone(), None)
- .await
- .expect("subscribe never fails");
+ let routes::Events(events) = routes::events(State(app.clone()), subscriber.clone(), None)
+ .await
+ .expect("subscribe never fails");
- let types::ResumableEvent(last_event_id, event) =
- events.next().immediately().await.expect("delivered events");
+ let types::ResumableEvent(last_event_id, event) = events
+ .filter(fixtures::filter::messages())
+ .next()
+ .immediately()
+ .await
+ .expect("delivered events");
assert_eq!(initial_message, event);
@@ -187,11 +187,9 @@ async fn resumes_from() {
};
// Resume after disconnect
- let reconnect_at = fixtures::now();
- let routes::Events(resumed) =
- routes::events(State(app), reconnect_at, subscriber, Some(resume_at.into()))
- .await
- .expect("subscribe never fails");
+ let routes::Events(resumed) = routes::events(State(app), subscriber, Some(resume_at.into()))
+ .await
+ .expect("subscribe never fails");
// Verify the structure of the response.
@@ -243,13 +241,12 @@ async fn serial_resume() {
];
// First subscription
- let subscribed_at = fixtures::now();
- let routes::Events(events) =
- routes::events(State(app.clone()), subscribed_at, subscriber.clone(), None)
- .await
- .expect("subscribe never fails");
+ let routes::Events(events) = routes::events(State(app.clone()), subscriber.clone(), None)
+ .await
+ .expect("subscribe never fails");
let events = events
+ .filter(fixtures::filter::messages())
.take(initial_messages.len())
.collect::<Vec<_>>()
.immediately()
@@ -277,10 +274,8 @@ async fn serial_resume() {
];
// Second subscription
- let resubscribed_at = fixtures::now();
let routes::Events(events) = routes::events(
State(app.clone()),
- resubscribed_at,
subscriber.clone(),
Some(resume_at.into()),
)
@@ -288,6 +283,7 @@ async fn serial_resume() {
.expect("subscribe never fails");
let events = events
+ .filter(fixtures::filter::messages())
.take(resume_messages.len())
.collect::<Vec<_>>()
.immediately()
@@ -314,11 +310,9 @@ async fn serial_resume() {
fixtures::message::send(&app, &sender, &channel_b, &fixtures::now()).await,
];
- // Second subscription
- let resubscribed_at = fixtures::now();
+ // Third subscription
let routes::Events(events) = routes::events(
State(app.clone()),
- resubscribed_at,
subscriber.clone(),
Some(resume_at.into()),
)
@@ -326,6 +320,7 @@ async fn serial_resume() {
.expect("subscribe never fails");
let events = events
+ .filter(fixtures::filter::messages())
.take(final_messages.len())
.collect::<Vec<_>>()
.immediately()
@@ -340,33 +335,3 @@ async fn serial_resume() {
}
};
}
-
-#[tokio::test]
-async fn removes_expired_messages() {
- // Set up the environment
- let app = fixtures::scratch_app().await;
- let sender = fixtures::login::create(&app).await;
- let channel = fixtures::channel::create(&app, &fixtures::now()).await;
-
- fixtures::message::send(&app, &sender, &channel, &fixtures::ancient()).await;
- let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await;
-
- // Call the endpoint
-
- let subscriber = fixtures::login::create(&app).await;
- let subscribed_at = fixtures::now();
-
- let routes::Events(mut events) = routes::events(State(app), subscribed_at, subscriber, None)
- .await
- .expect("subscribe never fails");
-
- // Verify the semantics
-
- let types::ResumableEvent(_, event) = events
- .next()
- .immediately()
- .await
- .expect("delivered messages");
-
- assert_eq!(message, event);
-}