summaryrefslogtreecommitdiff
path: root/src/events/routes
diff options
context:
space:
mode:
authorOwen Jacobson <owen@grimoire.ca>2024-09-28 21:55:20 -0400
committerOwen Jacobson <owen@grimoire.ca>2024-09-29 01:19:19 -0400
commit0b1cb80dd0b0f90c4892de7e7a2d18a076ecbdf2 (patch)
treeb41313dbd92811ffcc87b0af576dc570b5802a1e /src/events/routes
parent4d0bb0709b168a24ab6a8dbc86da45d7503596ee (diff)
Shut down the `/api/events` stream when the user logs out or their token expires.
When tokens are revoked (logout or expiry), the server now publishes an internal event via the new `logins` event broadcaster. These events are used to guard the `/api/events` stream. When a token revocation event arrives for the token used to subscribe to the stream, the stream is cut short, disconnecting the client. In service of this, tokens now have IDs, which are non-confidential values that can be used to discuss tokens without their secrets being passed around unnecessarily. These IDs are not (at this time) exposed to clients, but they could be.
Diffstat (limited to 'src/events/routes')
-rw-r--r--src/events/routes/test.rs57
1 files changed, 51 insertions, 6 deletions
diff --git a/src/events/routes/test.rs b/src/events/routes/test.rs
index a6e2275..0b62b5b 100644
--- a/src/events/routes/test.rs
+++ b/src/events/routes/test.rs
@@ -20,7 +20,8 @@ async fn includes_historical_message() {
// Call the endpoint
- let subscriber = fixtures::login::create(&app).await;
+ let subscriber_creds = fixtures::login::create_with_password(&app).await;
+ let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await;
let routes::Events(events) = routes::events(State(app), subscriber, None)
.await
.expect("subscribe never fails");
@@ -46,7 +47,8 @@ async fn includes_live_message() {
// Call the endpoint
- let subscriber = fixtures::login::create(&app).await;
+ let subscriber_creds = fixtures::login::create_with_password(&app).await;
+ let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await;
let routes::Events(events) = routes::events(State(app.clone()), subscriber, None)
.await
.expect("subscribe never fails");
@@ -90,7 +92,8 @@ async fn includes_multiple_channels() {
// Call the endpoint
- let subscriber = fixtures::login::create(&app).await;
+ let subscriber_creds = fixtures::login::create_with_password(&app).await;
+ let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await;
let routes::Events(events) = routes::events(State(app), subscriber, None)
.await
.expect("subscribe never fails");
@@ -127,7 +130,8 @@ async fn sequential_messages() {
// Call the endpoint
- let subscriber = fixtures::login::create(&app).await;
+ let subscriber_creds = fixtures::login::create_with_password(&app).await;
+ let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await;
let routes::Events(events) = routes::events(State(app), subscriber, None)
.await
.expect("subscribe never fails");
@@ -166,7 +170,8 @@ async fn resumes_from() {
// Call the endpoint
- let subscriber = fixtures::login::create(&app).await;
+ let subscriber_creds = fixtures::login::create_with_password(&app).await;
+ let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await;
let resume_at = {
// First subscription
@@ -232,7 +237,8 @@ async fn serial_resume() {
// Call the endpoint
- let subscriber = fixtures::login::create(&app).await;
+ let subscriber_creds = fixtures::login::create_with_password(&app).await;
+ let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await;
let resume_at = {
let initial_messages = [
@@ -335,3 +341,42 @@ async fn serial_resume() {
}
};
}
+
+#[tokio::test]
+async fn terminates_on_token_expiry() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+ let sender = fixtures::login::create(&app).await;
+
+ // Subscribe via the endpoint
+
+ let subscriber_creds = fixtures::login::create_with_password(&app).await;
+ let subscriber =
+ fixtures::identity::identity(&app, &subscriber_creds, &fixtures::ancient()).await;
+ let routes::Events(events) = routes::events(State(app.clone()), subscriber, None)
+ .await
+ .expect("subscribe never fails");
+
+ // Verify the resulting stream's behaviour
+
+ app.logins()
+ .expire(&fixtures::now())
+ .await
+ .expect("expiring tokens succeeds");
+
+ // These should not be delivered.
+ let messages = [
+ fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await,
+ fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await,
+ fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await,
+ ];
+
+ assert!(events
+ .filter(|types::ResumableEvent(_, event)| future::ready(messages.contains(event)))
+ .next()
+ .immediately()
+ .await
+ .is_none());
+}