use axum::extract::State; use axum_extra::extract::Query; use futures::{future, stream::StreamExt as _}; use crate::{ event::routes::get, test::fixtures::{self, future::Expect as _}, }; #[tokio::test] async fn terminates_on_token_expiry() { // Set up the environment let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; let sender = fixtures::login::create(&app, &fixtures::now()).await; // Subscribe via the endpoint let subscriber_creds = fixtures::login::create_with_password(&app, &fixtures::now()).await; let subscriber = fixtures::identity::logged_in(&app, &subscriber_creds, &fixtures::ancient()).await; let get::Response(events) = get::handler(State(app.clone()), subscriber, None, Query::default()) .await .expect("subscribe never fails"); // Verify the resulting stream's behaviour app.tokens() .expire(&fixtures::now()) .await .expect("expiring tokens succeeds"); // These should not be delivered. let messages = [ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, ]; events .filter_map(fixtures::event::message) .filter_map(fixtures::event::message::sent) .filter(|event| future::ready(messages.iter().any(|message| &event.message == message))) .next() .expect_none("end of stream") .await; } #[tokio::test] async fn terminates_on_logout() { // Set up the environment let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; let sender = fixtures::login::create(&app, &fixtures::now()).await; // Subscribe via the endpoint let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; let get::Response(events) = get::handler( State(app.clone()), subscriber.clone(), None, Query::default(), ) .await .expect("subscribe never fails"); // Verify the resulting stream's behaviour app.tokens() .logout(&subscriber.token) .await .expect("expiring tokens succeeds"); // These should not be delivered. let messages = [ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, ]; events .filter_map(fixtures::event::message) .filter_map(fixtures::event::message::sent) .filter(|event| future::ready(messages.iter().any(|message| &event.message == message))) .next() .expect_none("end of stream") .await; } #[tokio::test] async fn terminates_on_password_change() { // Set up the environment let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; let sender = fixtures::login::create(&app, &fixtures::now()).await; // Subscribe via the endpoint let creds = fixtures::login::create_with_password(&app, &fixtures::now()).await; let cookie = fixtures::cookie::logged_in(&app, &creds, &fixtures::now()).await; let subscriber = fixtures::identity::from_cookie(&app, &cookie, &fixtures::now()).await; let get::Response(events) = get::handler( State(app.clone()), subscriber.clone(), None, Query::default(), ) .await .expect("subscribe never fails"); // Verify the resulting stream's behaviour let (_, password) = creds; let to = fixtures::login::propose_password(); app.tokens() .change_password(&subscriber.login, &password, &to, &fixtures::now()) .await .expect("expiring tokens succeeds"); // These should not be delivered. let messages = [ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, ]; events .filter_map(fixtures::event::message) .filter_map(fixtures::event::message::sent) .filter(|event| future::ready(messages.iter().any(|message| &event.message == message))) .next() .expect_none("end of stream") .await; }