diff options
| author | Kit La Touche <kit@transneptune.net> | 2024-09-28 21:55:50 -0400 |
|---|---|---|
| committer | Kit La Touche <kit@transneptune.net> | 2024-09-28 21:55:50 -0400 |
| commit | 897eef0306917baf3662e691b29f182d35805296 (patch) | |
| tree | 024e2a3fa13ac96e0b4339a6d62ae533efe7db07 /src/events/routes/test.rs | |
| parent | c524b333befc8cc97aa49f73b3ed28bc3b82420c (diff) | |
| parent | 4d0bb0709b168a24ab6a8dbc86da45d7503596ee (diff) | |
Merge branch 'main' into feature-frontend
Diffstat (limited to 'src/events/routes/test.rs')
| -rw-r--r-- | src/events/routes/test.rs | 332 |
1 files changed, 76 insertions, 256 deletions
diff --git a/src/events/routes/test.rs b/src/events/routes/test.rs index 0b08fd6..a6e2275 100644 --- a/src/events/routes/test.rs +++ b/src/events/routes/test.rs @@ -1,70 +1,40 @@ use axum::extract::State; -use axum_extra::extract::Query; use futures::{ future, stream::{self, StreamExt as _}, }; use crate::{ - events::{app, routes}, - repo::channel::{self}, + events::{routes, types}, test::fixtures::{self, future::Immediately as _}, }; #[tokio::test] -async fn no_subscriptions() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let subscriber = fixtures::login::create(&app).await; - - // Call the endpoint - - let subscribed_at = fixtures::now(); - let query = routes::EventsQuery { - channels: [].into(), - }; - let routes::Events(mut events) = - routes::events(State(app), subscribed_at, subscriber, None, Query(query)) - .await - .expect("empty subscription"); - - // Verify the structure of the response. - - assert!(events.next().immediately().await.is_none()); -} - -#[tokio::test] async fn includes_historical_message() { // Set up the environment let app = fixtures::scratch_app().await; let sender = fixtures::login::create(&app).await; - let channel = fixtures::channel::create(&app).await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await; // Call the endpoint let subscriber = fixtures::login::create(&app).await; - let subscribed_at = fixtures::now(); - let query = routes::EventsQuery { - channels: [channel.id.clone()].into(), - }; - let routes::Events(mut events) = - routes::events(State(app), subscribed_at, subscriber, None, Query(query)) - .await - .expect("subscribed to valid channel"); + let routes::Events(events) = routes::events(State(app), subscriber, None) + .await + .expect("subscribe never fails"); // Verify the structure of the response. - let routes::ReplayableEvent(_, event) = events + let types::ResumableEvent(_, event) = events + .filter(fixtures::filter::messages()) .next() .immediately() .await .expect("delivered stored message"); - assert_eq!(channel.id, event.channel); - assert_eq!(message, event.message); + assert_eq!(message, event); } #[tokio::test] @@ -72,74 +42,28 @@ async fn includes_live_message() { // Set up the environment let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app).await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; // Call the endpoint let subscriber = fixtures::login::create(&app).await; - let subscribed_at = fixtures::now(); - let query = routes::EventsQuery { - channels: [channel.id.clone()].into(), - }; - let routes::Events(mut events) = routes::events( - State(app.clone()), - subscribed_at, - subscriber, - None, - Query(query), - ) - .await - .expect("subscribed to a valid channel"); + let routes::Events(events) = routes::events(State(app.clone()), subscriber, None) + .await + .expect("subscribe never fails"); // Verify the semantics let sender = fixtures::login::create(&app).await; let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await; - let routes::ReplayableEvent(_, event) = events + let types::ResumableEvent(_, event) = events + .filter(fixtures::filter::messages()) .next() .immediately() .await .expect("delivered live message"); - assert_eq!(channel.id, event.channel); - assert_eq!(message, event.message); -} - -#[tokio::test] -async fn excludes_other_channels() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let subscribed_channel = fixtures::channel::create(&app).await; - let unsubscribed_channel = fixtures::channel::create(&app).await; - let sender = fixtures::login::create(&app).await; - let message = - fixtures::message::send(&app, &sender, &subscribed_channel, &fixtures::now()).await; - fixtures::message::send(&app, &sender, &unsubscribed_channel, &fixtures::now()).await; - - // Call the endpoint - - let subscriber = fixtures::login::create(&app).await; - let subscribed_at = fixtures::now(); - let query = routes::EventsQuery { - channels: [subscribed_channel.id.clone()].into(), - }; - let routes::Events(mut events) = - routes::events(State(app), subscribed_at, subscriber, None, Query(query)) - .await - .expect("subscribed to a valid channel"); - - // Verify the semantics - - let routes::ReplayableEvent(_, event) = events - .next() - .immediately() - .await - .expect("delivered at least one message"); - - assert_eq!(subscribed_channel.id, event.channel); - assert_eq!(message, event.message); + assert_eq!(message, event); } #[tokio::test] @@ -150,15 +74,16 @@ async fn includes_multiple_channels() { let sender = fixtures::login::create(&app).await; let channels = [ - fixtures::channel::create(&app).await, - fixtures::channel::create(&app).await, + fixtures::channel::create(&app, &fixtures::now()).await, + fixtures::channel::create(&app, &fixtures::now()).await, ]; let messages = stream::iter(channels) - .then(|channel| async { - let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await; - - (channel, message) + .then(|channel| { + let app = app.clone(); + let sender = sender.clone(); + let channel = channel.clone(); + async move { fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await } }) .collect::<Vec<_>>() .await; @@ -166,68 +91,32 @@ async fn includes_multiple_channels() { // Call the endpoint let subscriber = fixtures::login::create(&app).await; - let subscribed_at = fixtures::now(); - let query = routes::EventsQuery { - channels: messages - .iter() - .map(|(channel, _)| &channel.id) - .cloned() - .collect(), - }; - let routes::Events(events) = - routes::events(State(app), subscribed_at, subscriber, None, Query(query)) - .await - .expect("subscribed to valid channels"); + let routes::Events(events) = routes::events(State(app), subscriber, None) + .await + .expect("subscribe never fails"); // Verify the structure of the response. let events = events + .filter(fixtures::filter::messages()) .take(messages.len()) .collect::<Vec<_>>() .immediately() .await; - for (channel, message) in messages { - assert!(events.iter().any(|routes::ReplayableEvent(_, event)| { - event.channel == channel.id && event.message == message - })); + for message in &messages { + assert!(events + .iter() + .any(|types::ResumableEvent(_, event)| { event == message })); } } #[tokio::test] -async fn nonexistent_channel() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let channel = channel::Id::generate(); - - // Call the endpoint - - let subscriber = fixtures::login::create(&app).await; - let subscribed_at = fixtures::now(); - let query = routes::EventsQuery { - channels: [channel.clone()].into(), - }; - let routes::ErrorResponse(error) = - routes::events(State(app), subscribed_at, subscriber, None, Query(query)) - .await - .expect_err("subscribed to nonexistent channel"); - - // Verify the structure of the response. - - fixtures::error::expected!( - error, - app::EventsError::ChannelNotFound(error_channel), - assert_eq!(channel, error_channel) - ); -} - -#[tokio::test] async fn sequential_messages() { // Set up the environment let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app).await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; let sender = fixtures::login::create(&app).await; let messages = vec![ @@ -239,31 +128,24 @@ async fn sequential_messages() { // Call the endpoint let subscriber = fixtures::login::create(&app).await; - let subscribed_at = fixtures::now(); - let query = routes::EventsQuery { - channels: [channel.id.clone()].into(), - }; - let routes::Events(events) = - routes::events(State(app), subscribed_at, subscriber, None, Query(query)) - .await - .expect("subscribed to a valid channel"); + let routes::Events(events) = routes::events(State(app), subscriber, None) + .await + .expect("subscribe never fails"); // Verify the structure of the response. - let mut events = events.filter(|routes::ReplayableEvent(_, event)| { - future::ready(messages.contains(&event.message)) - }); + let mut events = + events.filter(|types::ResumableEvent(_, event)| future::ready(messages.contains(event))); // Verify delivery in order for message in &messages { - let routes::ReplayableEvent(_, event) = events + let types::ResumableEvent(_, event) = events .next() .immediately() .await .expect("undelivered messages remaining"); - assert_eq!(channel.id, event.channel); - assert_eq!(message, &event.message); + assert_eq!(message, &event); } } @@ -272,7 +154,7 @@ async fn resumes_from() { // Set up the environment let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app).await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; let sender = fixtures::login::create(&app).await; let initial_message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await; @@ -285,43 +167,29 @@ async fn resumes_from() { // Call the endpoint let subscriber = fixtures::login::create(&app).await; - let subscribed_at = fixtures::now(); - let query = routes::EventsQuery { - channels: [channel.id.clone()].into(), - }; let resume_at = { // First subscription - let routes::Events(mut events) = routes::events( - State(app.clone()), - subscribed_at, - subscriber.clone(), - None, - Query(query.clone()), - ) - .await - .expect("subscribed to a valid channel"); + let routes::Events(events) = routes::events(State(app.clone()), subscriber.clone(), None) + .await + .expect("subscribe never fails"); - let routes::ReplayableEvent(id, event) = - events.next().immediately().await.expect("delivered events"); + let types::ResumableEvent(last_event_id, event) = events + .filter(fixtures::filter::messages()) + .next() + .immediately() + .await + .expect("delivered events"); - assert_eq!(channel.id, event.channel); - assert_eq!(initial_message, event.message); + assert_eq!(initial_message, event); - id + last_event_id }; // Resume after disconnect - let reconnect_at = fixtures::now(); - let routes::Events(resumed) = routes::events( - State(app), - reconnect_at, - subscriber, - Some(resume_at.into()), - Query(query), - ) - .await - .expect("subscribed to a valid channel"); + let routes::Events(resumed) = routes::events(State(app), subscriber, Some(resume_at.into())) + .await + .expect("subscribe never fails"); // Verify the structure of the response. @@ -331,11 +199,10 @@ async fn resumes_from() { .immediately() .await; - for message in later_messages { - assert!(events.iter().any( - |routes::ReplayableEvent(_, event)| event.channel == channel.id - && event.message == message - )); + for message in &later_messages { + assert!(events + .iter() + .any(|types::ResumableEvent(_, event)| event == message)); } } @@ -360,15 +227,12 @@ async fn serial_resume() { let app = fixtures::scratch_app().await; let sender = fixtures::login::create(&app).await; - let channel_a = fixtures::channel::create(&app).await; - let channel_b = fixtures::channel::create(&app).await; + let channel_a = fixtures::channel::create(&app, &fixtures::now()).await; + let channel_b = fixtures::channel::create(&app, &fixtures::now()).await; // Call the endpoint let subscriber = fixtures::login::create(&app).await; - let query = routes::EventsQuery { - channels: [channel_a.id.clone(), channel_b.id.clone()].into(), - }; let resume_at = { let initial_messages = [ @@ -377,30 +241,24 @@ async fn serial_resume() { ]; // First subscription - let subscribed_at = fixtures::now(); - let routes::Events(events) = routes::events( - State(app.clone()), - subscribed_at, - subscriber.clone(), - None, - Query(query.clone()), - ) - .await - .expect("subscribed to a valid channel"); + let routes::Events(events) = routes::events(State(app.clone()), subscriber.clone(), None) + .await + .expect("subscribe never fails"); let events = events + .filter(fixtures::filter::messages()) .take(initial_messages.len()) .collect::<Vec<_>>() .immediately() .await; - for message in initial_messages { + for message in &initial_messages { assert!(events .iter() - .any(|routes::ReplayableEvent(_, event)| event.message == message)); + .any(|types::ResumableEvent(_, event)| event == message)); } - let routes::ReplayableEvent(id, _) = events.last().expect("this vec is non-empty"); + let types::ResumableEvent(id, _) = events.last().expect("this vec is non-empty"); id.to_owned() }; @@ -416,30 +274,28 @@ async fn serial_resume() { ]; // Second subscription - let resubscribed_at = fixtures::now(); let routes::Events(events) = routes::events( State(app.clone()), - resubscribed_at, subscriber.clone(), Some(resume_at.into()), - Query(query.clone()), ) .await - .expect("subscribed to a valid channel"); + .expect("subscribe never fails"); let events = events + .filter(fixtures::filter::messages()) .take(resume_messages.len()) .collect::<Vec<_>>() .immediately() .await; - for message in resume_messages { + for message in &resume_messages { assert!(events .iter() - .any(|routes::ReplayableEvent(_, event)| event.message == message)); + .any(|types::ResumableEvent(_, event)| event == message)); } - let routes::ReplayableEvent(id, _) = events.last().expect("this vec is non-empty"); + let types::ResumableEvent(id, _) = events.last().expect("this vec is non-empty"); id.to_owned() }; @@ -454,19 +310,17 @@ async fn serial_resume() { fixtures::message::send(&app, &sender, &channel_b, &fixtures::now()).await, ]; - // Second subscription - let resubscribed_at = fixtures::now(); + // Third subscription let routes::Events(events) = routes::events( State(app.clone()), - resubscribed_at, subscriber.clone(), Some(resume_at.into()), - Query(query.clone()), ) .await - .expect("subscribed to a valid channel"); + .expect("subscribe never fails"); let events = events + .filter(fixtures::filter::messages()) .take(final_messages.len()) .collect::<Vec<_>>() .immediately() @@ -474,44 +328,10 @@ async fn serial_resume() { // This set of messages, in particular, _should not_ include any prior // messages from `initial_messages` or `resume_messages`. - for message in final_messages { + for message in &final_messages { assert!(events .iter() - .any(|routes::ReplayableEvent(_, event)| event.message == message)); + .any(|types::ResumableEvent(_, event)| event == message)); } }; } - -#[tokio::test] -async fn removes_expired_messages() { - // Set up the environment - let app = fixtures::scratch_app().await; - let sender = fixtures::login::create(&app).await; - let channel = fixtures::channel::create(&app).await; - - fixtures::message::send(&app, &sender, &channel, &fixtures::ancient()).await; - let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await; - - // Call the endpoint - - let subscriber = fixtures::login::create(&app).await; - let subscribed_at = fixtures::now(); - let query = routes::EventsQuery { - channels: [channel.id.clone()].into(), - }; - let routes::Events(mut events) = - routes::events(State(app), subscribed_at, subscriber, None, Query(query)) - .await - .expect("subscribed to valid channel"); - - // Verify the semantics - - let routes::ReplayableEvent(_, event) = events - .next() - .immediately() - .await - .expect("delivered messages"); - - assert_eq!(channel.id, event.channel); - assert_eq!(message, event.message); -} |
