diff options
Diffstat (limited to 'src/event/routes/test.rs')
| -rw-r--r-- | src/event/routes/test.rs | 459 |
1 files changed, 0 insertions, 459 deletions
diff --git a/src/event/routes/test.rs b/src/event/routes/test.rs deleted file mode 100644 index 49f8094..0000000 --- a/src/event/routes/test.rs +++ /dev/null @@ -1,459 +0,0 @@ -use axum::extract::State; -use axum_extra::extract::Query; -use futures::{ - future, - stream::{self, StreamExt as _}, -}; - -use super::get; -use crate::{ - event::Sequenced as _, - test::fixtures::{self, future::Immediately as _}, -}; - -#[tokio::test] -async fn includes_historical_message() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let sender = fixtures::login::create(&app, &fixtures::now()).await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; - let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; - - // Call the endpoint - - let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = get::handler(State(app), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); - - // Verify the structure of the response. - - let event = events - .filter_map(fixtures::message::events) - .next() - .immediately() - .await - .expect("delivered stored message"); - - assert!(fixtures::event::message_sent(&event, &message)); -} - -#[tokio::test] -async fn includes_live_message() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; - - // Call the endpoint - - let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); - - // Verify the semantics - - let sender = fixtures::login::create(&app, &fixtures::now()).await; - let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; - - let event = events - .filter_map(fixtures::message::events) - .next() - .immediately() - .await - .expect("delivered live message"); - - assert!(fixtures::event::message_sent(&event, &message)); -} - -#[tokio::test] -async fn includes_multiple_channels() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let sender = fixtures::login::create(&app, &fixtures::now()).await; - - let channels = [ - fixtures::channel::create(&app, &fixtures::now()).await, - fixtures::channel::create(&app, &fixtures::now()).await, - ]; - - let messages = stream::iter(channels) - .then(|channel| { - let app = app.clone(); - let sender = sender.clone(); - let channel = channel.clone(); - async move { fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await } - }) - .collect::<Vec<_>>() - .await; - - // Call the endpoint - - let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = get::handler(State(app), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); - - // Verify the structure of the response. - - let events = events - .filter_map(fixtures::message::events) - .take(messages.len()) - .collect::<Vec<_>>() - .immediately() - .await; - - for message in &messages { - assert!(events - .iter() - .any(|event| fixtures::event::message_sent(event, message))); - } -} - -#[tokio::test] -async fn sequential_messages() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; - let sender = fixtures::login::create(&app, &fixtures::now()).await; - - let messages = vec![ - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, - ]; - - // Call the endpoint - - let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = get::handler(State(app), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); - - // Verify the structure of the response. - - let mut events = events - .filter_map(fixtures::message::events) - .filter(|event| { - future::ready( - messages - .iter() - .any(|message| fixtures::event::message_sent(event, message)), - ) - }); - - // Verify delivery in order - for message in &messages { - let event = events - .next() - .immediately() - .await - .expect("undelivered messages remaining"); - - assert!(fixtures::event::message_sent(&event, message)); - } -} - -#[tokio::test] -async fn resumes_from() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; - let sender = fixtures::login::create(&app, &fixtures::now()).await; - - let initial_message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; - - let later_messages = vec![ - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, - ]; - - // Call the endpoint - - let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - - let resume_at = { - // First subscription - let get::Response(events) = get::handler( - State(app.clone()), - subscriber.clone(), - None, - Query::default(), - ) - .await - .expect("subscribe never fails"); - - let event = events - .filter_map(fixtures::message::events) - .next() - .immediately() - .await - .expect("delivered events"); - - assert!(fixtures::event::message_sent(&event, &initial_message)); - - event.sequence() - }; - - // Resume after disconnect - let get::Response(resumed) = get::handler( - State(app), - subscriber, - Some(resume_at.into()), - Query::default(), - ) - .await - .expect("subscribe never fails"); - - // Verify the structure of the response. - - let events = resumed - .filter_map(fixtures::message::events) - .take(later_messages.len()) - .collect::<Vec<_>>() - .immediately() - .await; - - for message in &later_messages { - assert!(events - .iter() - .any(|event| fixtures::event::message_sent(event, message))); - } -} - -// This test verifies a real bug I hit developing the vector-of-sequences -// approach to resuming events. A small omission caused the event IDs in a -// resumed stream to _omit_ channels that were in the original stream until -// those channels also appeared in the resumed stream. -// -// Clients would see something like -// * In the original stream, Cfoo=5,Cbar=8 -// * In the resumed stream, Cfoo=6 (no Cbar sequence number) -// -// Disconnecting and reconnecting a second time, using event IDs from that -// initial period of the first resume attempt, would then cause the second -// resume attempt to restart all other channels from the beginning, and not -// from where the first disconnection happened. -// -// This is a real and valid behaviour for clients! -#[tokio::test] -async fn serial_resume() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let sender = fixtures::login::create(&app, &fixtures::now()).await; - let channel_a = fixtures::channel::create(&app, &fixtures::now()).await; - let channel_b = fixtures::channel::create(&app, &fixtures::now()).await; - - // Call the endpoint - - let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - - let resume_at = { - let initial_messages = [ - fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await, - fixtures::message::send(&app, &channel_b, &sender, &fixtures::now()).await, - ]; - - // First subscription - let get::Response(events) = get::handler( - State(app.clone()), - subscriber.clone(), - None, - Query::default(), - ) - .await - .expect("subscribe never fails"); - - let events = events - .filter_map(fixtures::message::events) - .take(initial_messages.len()) - .collect::<Vec<_>>() - .immediately() - .await; - - for message in &initial_messages { - assert!(events - .iter() - .any(|event| fixtures::event::message_sent(event, message))); - } - - let event = events.last().expect("this vec is non-empty"); - - event.sequence() - }; - - // Resume after disconnect - let resume_at = { - let resume_messages = [ - // Note that channel_b does not appear here. The buggy behaviour - // would be masked if channel_b happened to send a new message - // into the resumed event stream. - fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await, - fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await, - ]; - - // Second subscription - let get::Response(events) = get::handler( - State(app.clone()), - subscriber.clone(), - Some(resume_at.into()), - Query::default(), - ) - .await - .expect("subscribe never fails"); - - let events = events - .filter_map(fixtures::message::events) - .take(resume_messages.len()) - .collect::<Vec<_>>() - .immediately() - .await; - - for message in &resume_messages { - assert!(events - .iter() - .any(|event| fixtures::event::message_sent(event, message))); - } - - let event = events.last().expect("this vec is non-empty"); - - event.sequence() - }; - - // Resume after disconnect a second time - { - // At this point, we can send on either channel and demonstrate the - // problem. The resume point should before both of these messages, but - // after _all_ prior messages. - let final_messages = [ - fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await, - fixtures::message::send(&app, &channel_b, &sender, &fixtures::now()).await, - ]; - - // Third subscription - let get::Response(events) = get::handler( - State(app.clone()), - subscriber.clone(), - Some(resume_at.into()), - Query::default(), - ) - .await - .expect("subscribe never fails"); - - let events = events - .filter_map(fixtures::message::events) - .take(final_messages.len()) - .collect::<Vec<_>>() - .immediately() - .await; - - // This set of messages, in particular, _should not_ include any prior - // messages from `initial_messages` or `resume_messages`. - for message in &final_messages { - assert!(events - .iter() - .any(|event| fixtures::event::message_sent(event, message))); - } - }; -} - -#[tokio::test] -async fn terminates_on_token_expiry() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; - let sender = fixtures::login::create(&app, &fixtures::now()).await; - - // Subscribe via the endpoint - - let subscriber_creds = fixtures::login::create_with_password(&app, &fixtures::now()).await; - let subscriber = - fixtures::identity::logged_in(&app, &subscriber_creds, &fixtures::ancient()).await; - - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); - - // Verify the resulting stream's behaviour - - app.tokens() - .expire(&fixtures::now()) - .await - .expect("expiring tokens succeeds"); - - // These should not be delivered. - let messages = [ - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, - ]; - - assert!(events - .filter_map(fixtures::message::events) - .filter(|event| future::ready( - messages - .iter() - .any(|message| fixtures::event::message_sent(event, message)) - )) - .next() - .immediately() - .await - .is_none()); -} - -#[tokio::test] -async fn terminates_on_logout() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; - let sender = fixtures::login::create(&app, &fixtures::now()).await; - - // Subscribe via the endpoint - - let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - - let get::Response(events) = get::handler( - State(app.clone()), - subscriber.clone(), - None, - Query::default(), - ) - .await - .expect("subscribe never fails"); - - // Verify the resulting stream's behaviour - - app.tokens() - .logout(&subscriber.token) - .await - .expect("expiring tokens succeeds"); - - // These should not be delivered. - let messages = [ - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, - ]; - - assert!(events - .filter_map(fixtures::message::events) - .filter(|event| future::ready( - messages - .iter() - .any(|message| fixtures::event::message_sent(event, message)) - )) - .next() - .immediately() - .await - .is_none()); -} |
