summaryrefslogtreecommitdiff
path: root/src/event/handlers/stream/test/token.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/event/handlers/stream/test/token.rs')
-rw-r--r--src/event/handlers/stream/test/token.rs148
1 files changed, 148 insertions, 0 deletions
diff --git a/src/event/handlers/stream/test/token.rs b/src/event/handlers/stream/test/token.rs
new file mode 100644
index 0000000..2008323
--- /dev/null
+++ b/src/event/handlers/stream/test/token.rs
@@ -0,0 +1,148 @@
+use axum::extract::State;
+use axum_extra::extract::Query;
+use futures::{future, stream::StreamExt as _};
+
+use crate::test::fixtures::{self, future::Expect as _};
+
+#[tokio::test]
+async fn terminates_on_token_expiry() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+ let sender = fixtures::user::create(&app, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
+
+ // Subscribe via the endpoint
+
+ let subscriber_creds = fixtures::user::create_with_password(&app, &fixtures::now()).await;
+ let subscriber =
+ fixtures::identity::logged_in(&app, &subscriber_creds, &fixtures::ancient()).await;
+
+ let super::Response(events) = super::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(super::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ // Verify the resulting stream's behaviour
+
+ app.tokens()
+ .expire(&fixtures::now())
+ .await
+ .expect("expiring tokens succeeds");
+
+ // These should not be delivered.
+ let messages = [
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ ];
+
+ events
+ .filter_map(fixtures::event::message)
+ .filter_map(fixtures::event::message::sent)
+ .filter(|event| future::ready(messages.iter().any(|message| &event.message == message)))
+ .next()
+ .expect_none("end of stream")
+ .await;
+}
+
+#[tokio::test]
+async fn terminates_on_logout() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+ let sender = fixtures::user::create(&app, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
+
+ // Subscribe via the endpoint
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+
+ let super::Response(events) = super::handler(
+ State(app.clone()),
+ subscriber.clone(),
+ None,
+ Query(super::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ // Verify the resulting stream's behaviour
+
+ app.tokens()
+ .logout(&subscriber.token)
+ .await
+ .expect("expiring tokens succeeds");
+
+ // These should not be delivered.
+
+ let messages = [
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ ];
+
+ events
+ .filter_map(fixtures::event::message)
+ .filter_map(fixtures::event::message::sent)
+ .filter(|event| future::ready(messages.iter().any(|message| &event.message == message)))
+ .next()
+ .expect_none("end of stream")
+ .await;
+}
+
+#[tokio::test]
+async fn terminates_on_password_change() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+ let sender = fixtures::user::create(&app, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
+
+ // Subscribe via the endpoint
+
+ let creds = fixtures::user::create_with_password(&app, &fixtures::now()).await;
+ let cookie = fixtures::cookie::logged_in(&app, &creds, &fixtures::now()).await;
+ let subscriber = fixtures::identity::from_cookie(&app, &cookie, &fixtures::now()).await;
+
+ let super::Response(events) = super::handler(
+ State(app.clone()),
+ subscriber.clone(),
+ None,
+ Query(super::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ // Verify the resulting stream's behaviour
+
+ let (_, password) = creds;
+ let to = fixtures::user::propose_password();
+ app.tokens()
+ .change_password(&subscriber.user, &password, &to, &fixtures::now())
+ .await
+ .expect("expiring tokens succeeds");
+
+ // These should not be delivered.
+
+ let messages = [
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ ];
+
+ events
+ .filter_map(fixtures::event::message)
+ .filter_map(fixtures::event::message::sent)
+ .filter(|event| future::ready(messages.iter().any(|message| &event.message == message)))
+ .next()
+ .expect_none("end of stream")
+ .await;
+}