From 0b1cb80dd0b0f90c4892de7e7a2d18a076ecbdf2 Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Sat, 28 Sep 2024 21:55:20 -0400 Subject: Shut down the `/api/events` stream when the user logs out or their token expires. When tokens are revoked (logout or expiry), the server now publishes an internal event via the new `logins` event broadcaster. These events are used to guard the `/api/events` stream. When a token revocation event arrives for the token used to subscribe to the stream, the stream is cut short, disconnecting the client. In service of this, tokens now have IDs, which are non-confidential values that can be used to discuss tokens without their secrets being passed around unnecessarily. These IDs are not (at this time) exposed to clients, but they could be. --- src/login/app.rs | 58 ++++++++++++++++++++++++++++----- src/login/broadcaster.rs | 3 ++ src/login/extract.rs | 74 ++++++++++++++++++++++++++++++++++++++++-- src/login/mod.rs | 2 ++ src/login/routes/test/login.rs | 4 +-- src/login/types.rs | 12 +++++++ 6 files changed, 140 insertions(+), 13 deletions(-) create mode 100644 src/login/broadcaster.rs create mode 100644 src/login/types.rs (limited to 'src/login') diff --git a/src/login/app.rs b/src/login/app.rs index f7fec88..b8916a8 100644 --- a/src/login/app.rs +++ b/src/login/app.rs @@ -1,24 +1,30 @@ use chrono::TimeDelta; +use futures::{ + future, + stream::{self, StreamExt as _}, + Stream, +}; use sqlx::sqlite::SqlitePool; -use super::{extract::IdentitySecret, repo::auth::Provider as _}; +use super::{broadcaster::Broadcaster, extract::IdentitySecret, repo::auth::Provider as _, types}; use crate::{ clock::DateTime, password::Password, repo::{ error::NotFound as _, login::{Login, Provider as _}, - token::Provider as _, + token::{self, Provider as _}, }, }; pub struct Logins<'a> { db: &'a SqlitePool, + logins: &'a Broadcaster, } impl<'a> Logins<'a> { - pub const fn new(db: &'a SqlitePool) -> Self { - Self { db } + pub const fn new(db: &'a SqlitePool, logins: &'a Broadcaster) -> Self { + Self { db, logins } } pub async fn login( @@ -63,7 +69,7 @@ impl<'a> Logins<'a> { &self, secret: &IdentitySecret, used_at: &DateTime, - ) -> Result { + ) -> Result<(token::Id, Login), ValidateError> { let mut tx = self.db.begin().await?; let login = tx .tokens() @@ -75,26 +81,56 @@ impl<'a> Logins<'a> { Ok(login) } + pub fn limit_stream( + &self, + token: token::Id, + events: impl Stream + std::fmt::Debug, + ) -> impl Stream + std::fmt::Debug + where + E: std::fmt::Debug, + { + let token_events = self + .logins + .subscribe() + .filter(move |event| future::ready(event.token == token)) + .map(|_| GuardedEvent::TokenRevoked); + + let events = events.map(|event| GuardedEvent::Event(event)); + + stream::select(token_events, events).scan((), |(), event| { + future::ready(match event { + GuardedEvent::Event(event) => Some(event), + GuardedEvent::TokenRevoked => None, + }) + }) + } + pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { // Somewhat arbitrarily, expire after 7 days. let expire_at = relative_to.to_owned() - TimeDelta::days(7); let mut tx = self.db.begin().await?; - tx.tokens().expire(&expire_at).await?; + let tokens = tx.tokens().expire(&expire_at).await?; tx.commit().await?; + for event in tokens.into_iter().map(types::TokenRevoked::from) { + self.logins.broadcast(&event); + } + Ok(()) } pub async fn logout(&self, secret: &IdentitySecret) -> Result<(), ValidateError> { let mut tx = self.db.begin().await?; - tx.tokens() + let token = tx + .tokens() .revoke(secret) .await .not_found(|| ValidateError::InvalidToken)?; - tx.commit().await?; + self.logins.broadcast(&types::TokenRevoked::from(token)); + Ok(()) } } @@ -124,3 +160,9 @@ pub enum ValidateError { #[error(transparent)] DatabaseError(#[from] sqlx::Error), } + +#[derive(Debug)] +enum GuardedEvent { + TokenRevoked, + Event(E), +} diff --git a/src/login/broadcaster.rs b/src/login/broadcaster.rs new file mode 100644 index 0000000..8e1fb3a --- /dev/null +++ b/src/login/broadcaster.rs @@ -0,0 +1,3 @@ +use crate::{broadcast, login::types}; + +pub type Broadcaster = broadcast::Broadcaster; diff --git a/src/login/extract.rs b/src/login/extract.rs index 3b31d4c..b585565 100644 --- a/src/login/extract.rs +++ b/src/login/extract.rs @@ -1,12 +1,20 @@ use std::fmt; use axum::{ - extract::FromRequestParts, - http::request::Parts, - response::{IntoResponseParts, ResponseParts}, + extract::{FromRequestParts, State}, + http::{request::Parts, StatusCode}, + response::{IntoResponse, IntoResponseParts, Response, ResponseParts}, }; use axum_extra::extract::cookie::{Cookie, CookieJar}; +use crate::{ + app::App, + clock::RequestedAt, + error::Internal, + login::app::ValidateError, + repo::{login::Login, token}, +}; + // The usage pattern here - receive the extractor as an argument, return it in // the response - is heavily modelled after CookieJar's own intended usage. #[derive(Clone)] @@ -112,3 +120,63 @@ where Self(value.into()) } } + +#[derive(Clone, Debug)] +pub struct Identity { + pub token: token::Id, + pub login: Login, +} + +#[async_trait::async_trait] +impl FromRequestParts for Identity { + type Rejection = LoginError; + + async fn from_request_parts(parts: &mut Parts, state: &App) -> Result { + // After Rust 1.82 (and #[feature(min_exhaustive_patterns)] lands on + // stable), the following can be replaced: + // + // ``` + // let Ok(identity_token) = IdentityToken::from_request_parts( + // parts, + // state, + // ).await; + // ``` + let identity_token = IdentityToken::from_request_parts(parts, state).await?; + let RequestedAt(used_at) = RequestedAt::from_request_parts(parts, state).await?; + + let secret = identity_token.secret().ok_or(LoginError::Unauthorized)?; + + let app = State::::from_request_parts(parts, state).await?; + match app.logins().validate(&secret, &used_at).await { + Ok((token, login)) => Ok(Identity { token, login }), + Err(ValidateError::InvalidToken) => Err(LoginError::Unauthorized), + Err(other) => Err(other.into()), + } + } +} + +pub enum LoginError { + Failure(E), + Unauthorized, +} + +impl IntoResponse for LoginError +where + E: IntoResponse, +{ + fn into_response(self) -> Response { + match self { + Self::Unauthorized => (StatusCode::UNAUTHORIZED, "unauthorized").into_response(), + Self::Failure(e) => e.into_response(), + } + } +} + +impl From for LoginError +where + E: Into, +{ + fn from(err: E) -> Self { + Self::Failure(err.into()) + } +} diff --git a/src/login/mod.rs b/src/login/mod.rs index 191cce0..6ae82ac 100644 --- a/src/login/mod.rs +++ b/src/login/mod.rs @@ -1,6 +1,8 @@ pub use self::routes::router; pub mod app; +pub mod broadcaster; pub mod extract; mod repo; mod routes; +pub mod types; diff --git a/src/login/routes/test/login.rs b/src/login/routes/test/login.rs index 10c17d6..81653ff 100644 --- a/src/login/routes/test/login.rs +++ b/src/login/routes/test/login.rs @@ -36,7 +36,7 @@ async fn new_identity() { // Verify the semantics let validated_at = fixtures::now(); - let validated = app + let (_, validated) = app .logins() .validate(&secret, &validated_at) .await @@ -73,7 +73,7 @@ async fn existing_identity() { // Verify the semantics let validated_at = fixtures::now(); - let validated_login = app + let (_, validated_login) = app .logins() .validate(&secret, &validated_at) .await diff --git a/src/login/types.rs b/src/login/types.rs new file mode 100644 index 0000000..7c7cbf9 --- /dev/null +++ b/src/login/types.rs @@ -0,0 +1,12 @@ +use crate::repo::token; + +#[derive(Clone, Debug)] +pub struct TokenRevoked { + pub token: token::Id, +} + +impl From for TokenRevoked { + fn from(token: token::Id) -> Self { + Self { token } + } +} -- cgit v1.2.3 From 6c054c5b8d43a818ccfa9087960dc19b286e6bb7 Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Sun, 29 Sep 2024 02:02:41 -0400 Subject: Reimplement the logout machinery in terms of token IDs, not token secrets. This (a) reduces the amount of passing secrets around that's needed, and (b) allows tests to log out in a more straightforwards manner. Ish. The fixtures are a mess, but so is the nomenclature. Fix the latter and the former will probably follow. --- ...15b487ef0e138989e0b82cd4ff4f7187af6fb529d2.json | 20 ---------- ...e6eeff67a7bfb8402c09651f4034beb487c3d7d58f.json | 20 ++++++++++ src/events/routes/test.rs | 43 ++++++++++++++++++++++ src/login/app.rs | 11 ++---- src/login/routes.rs | 17 ++++++--- src/login/routes/test/logout.rs | 30 ++++++++++----- src/repo/token.rs | 10 ++--- src/test/fixtures/identity.rs | 12 +++--- 8 files changed, 111 insertions(+), 52 deletions(-) delete mode 100644 .sqlx/query-8b057bc13014ee61cc2abc15b487ef0e138989e0b82cd4ff4f7187af6fb529d2.json create mode 100644 .sqlx/query-b5305455d7a3bcd21d39d6e6eeff67a7bfb8402c09651f4034beb487c3d7d58f.json (limited to 'src/login') diff --git a/.sqlx/query-8b057bc13014ee61cc2abc15b487ef0e138989e0b82cd4ff4f7187af6fb529d2.json b/.sqlx/query-8b057bc13014ee61cc2abc15b487ef0e138989e0b82cd4ff4f7187af6fb529d2.json deleted file mode 100644 index 0d3dcdf..0000000 --- a/.sqlx/query-8b057bc13014ee61cc2abc15b487ef0e138989e0b82cd4ff4f7187af6fb529d2.json +++ /dev/null @@ -1,20 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n delete\n from token\n where secret = $1\n returning id as \"id: Id\"\n ", - "describe": { - "columns": [ - { - "name": "id: Id", - "ordinal": 0, - "type_info": "Text" - } - ], - "parameters": { - "Right": 1 - }, - "nullable": [ - false - ] - }, - "hash": "8b057bc13014ee61cc2abc15b487ef0e138989e0b82cd4ff4f7187af6fb529d2" -} diff --git a/.sqlx/query-b5305455d7a3bcd21d39d6e6eeff67a7bfb8402c09651f4034beb487c3d7d58f.json b/.sqlx/query-b5305455d7a3bcd21d39d6e6eeff67a7bfb8402c09651f4034beb487c3d7d58f.json new file mode 100644 index 0000000..1be8e07 --- /dev/null +++ b/.sqlx/query-b5305455d7a3bcd21d39d6e6eeff67a7bfb8402c09651f4034beb487c3d7d58f.json @@ -0,0 +1,20 @@ +{ + "db_name": "SQLite", + "query": "\n delete\n from token\n where id = $1\n returning id as \"id: Id\"\n ", + "describe": { + "columns": [ + { + "name": "id: Id", + "ordinal": 0, + "type_info": "Text" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false + ] + }, + "hash": "b5305455d7a3bcd21d39d6e6eeff67a7bfb8402c09651f4034beb487c3d7d58f" +} diff --git a/src/events/routes/test.rs b/src/events/routes/test.rs index 0b62b5b..820192d 100644 --- a/src/events/routes/test.rs +++ b/src/events/routes/test.rs @@ -355,6 +355,7 @@ async fn terminates_on_token_expiry() { let subscriber_creds = fixtures::login::create_with_password(&app).await; let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::ancient()).await; + let routes::Events(events) = routes::events(State(app.clone()), subscriber, None) .await .expect("subscribe never fails"); @@ -380,3 +381,45 @@ async fn terminates_on_token_expiry() { .await .is_none()); } + +#[tokio::test] +async fn terminates_on_logout() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let sender = fixtures::login::create(&app).await; + + // Subscribe via the endpoint + + let subscriber_creds = fixtures::login::create_with_password(&app).await; + let subscriber_token = + fixtures::identity::logged_in(&app, &subscriber_creds, &fixtures::now()).await; + let subscriber = + fixtures::identity::from_token(&app, &subscriber_token, &fixtures::now()).await; + + let routes::Events(events) = routes::events(State(app.clone()), subscriber.clone(), None) + .await + .expect("subscribe never fails"); + + // Verify the resulting stream's behaviour + + app.logins() + .logout(&subscriber.token) + .await + .expect("expiring tokens succeeds"); + + // These should not be delivered. + let messages = [ + fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await, + fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await, + fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await, + ]; + + assert!(events + .filter(|types::ResumableEvent(_, event)| future::ready(messages.contains(event))) + .next() + .immediately() + .await + .is_none()); +} diff --git a/src/login/app.rs b/src/login/app.rs index b8916a8..182c62c 100644 --- a/src/login/app.rs +++ b/src/login/app.rs @@ -120,16 +120,13 @@ impl<'a> Logins<'a> { Ok(()) } - pub async fn logout(&self, secret: &IdentitySecret) -> Result<(), ValidateError> { + pub async fn logout(&self, token: &token::Id) -> Result<(), ValidateError> { let mut tx = self.db.begin().await?; - let token = tx - .tokens() - .revoke(secret) - .await - .not_found(|| ValidateError::InvalidToken)?; + tx.tokens().revoke(token).await?; tx.commit().await?; - self.logins.broadcast(&types::TokenRevoked::from(token)); + self.logins + .broadcast(&types::TokenRevoked::from(token.clone())); Ok(()) } diff --git a/src/login/routes.rs b/src/login/routes.rs index 4664063..8d9e938 100644 --- a/src/login/routes.rs +++ b/src/login/routes.rs @@ -78,27 +78,32 @@ struct LogoutRequest {} async fn on_logout( State(app): State, + RequestedAt(now): RequestedAt, identity: IdentityToken, // This forces the only valid request to be `{}`, and not the infinite // variation allowed when there's no body extractor. Json(LogoutRequest {}): Json, ) -> Result<(IdentityToken, StatusCode), LogoutError> { if let Some(secret) = identity.secret() { - app.logins().logout(&secret).await.map_err(LogoutError)?; + let (token, _) = app.logins().validate(&secret, &now).await?; + app.logins().logout(&token).await?; } let identity = identity.clear(); Ok((identity, StatusCode::NO_CONTENT)) } -#[derive(Debug)] -struct LogoutError(app::ValidateError); +#[derive(Debug, thiserror::Error)] +#[error(transparent)] +enum LogoutError { + ValidateError(#[from] app::ValidateError), + DatabaseError(#[from] sqlx::Error), +} impl IntoResponse for LogoutError { fn into_response(self) -> Response { - let Self(error) = self; - match error { - error @ app::ValidateError::InvalidToken => { + match self { + error @ Self::ValidateError(app::ValidateError::InvalidToken) => { (StatusCode::UNAUTHORIZED, error.to_string()).into_response() } other => Internal::from(other).into_response(), diff --git a/src/login/routes/test/logout.rs b/src/login/routes/test/logout.rs index 05594be..20b0d55 100644 --- a/src/login/routes/test/logout.rs +++ b/src/login/routes/test/logout.rs @@ -22,6 +22,7 @@ async fn successful() { let (response_identity, response_status) = routes::on_logout( State(app.clone()), + fixtures::now(), identity.clone(), Json(routes::LogoutRequest {}), ) @@ -57,10 +58,14 @@ async fn no_identity() { // Call the endpoint let identity = fixtures::identity::not_logged_in(); - let (identity, status) = - routes::on_logout(State(app), identity, Json(routes::LogoutRequest {})) - .await - .expect("logged out with no token"); + let (identity, status) = routes::on_logout( + State(app), + fixtures::now(), + identity, + Json(routes::LogoutRequest {}), + ) + .await + .expect("logged out with no token"); // Verify the return value's basic structure @@ -77,12 +82,19 @@ async fn invalid_token() { // Call the endpoint let identity = fixtures::identity::fictitious(); - let routes::LogoutError(error) = - routes::on_logout(State(app), identity, Json(routes::LogoutRequest {})) - .await - .expect_err("logged out with an invalid token"); + let error = routes::on_logout( + State(app), + fixtures::now(), + identity, + Json(routes::LogoutRequest {}), + ) + .await + .expect_err("logged out with an invalid token"); // Verify the return value's basic structure - assert!(matches!(error, app::ValidateError::InvalidToken)); + assert!(matches!( + error, + routes::LogoutError::ValidateError(app::ValidateError::InvalidToken) + )); } diff --git a/src/repo/token.rs b/src/repo/token.rs index 5f39e1d..d96c094 100644 --- a/src/repo/token.rs +++ b/src/repo/token.rs @@ -48,20 +48,20 @@ impl<'c> Tokens<'c> { } // Revoke a token by its secret. - pub async fn revoke(&mut self, secret: &IdentitySecret) -> Result { - let token = sqlx::query_scalar!( + pub async fn revoke(&mut self, token: &Id) -> Result<(), sqlx::Error> { + sqlx::query_scalar!( r#" delete from token - where secret = $1 + where id = $1 returning id as "id: Id" "#, - secret, + token, ) .fetch_one(&mut *self.0) .await?; - Ok(token) + Ok(()) } // Expire and delete all tokens that haven't been used more recently than diff --git a/src/test/fixtures/identity.rs b/src/test/fixtures/identity.rs index bdd7881..633fb8a 100644 --- a/src/test/fixtures/identity.rs +++ b/src/test/fixtures/identity.rs @@ -22,11 +22,8 @@ pub async fn logged_in(app: &App, login: &(String, Password), now: &RequestedAt) IdentityToken::new().set(token) } -pub async fn identity(app: &App, login: &(String, Password), issued_at: &RequestedAt) -> Identity { - let secret = logged_in(app, login, issued_at) - .await - .secret() - .expect("successful login generates a secret"); +pub async fn from_token(app: &App, token: &IdentityToken, issued_at: &RequestedAt) -> Identity { + let secret = token.secret().expect("identity token has a secret"); let (token, login) = app .logins() .validate(&secret, issued_at) @@ -36,6 +33,11 @@ pub async fn identity(app: &App, login: &(String, Password), issued_at: &Request Identity { token, login } } +pub async fn identity(app: &App, login: &(String, Password), issued_at: &RequestedAt) -> Identity { + let secret = logged_in(app, login, issued_at).await; + from_token(app, &secret, issued_at).await +} + pub fn secret(identity: &IdentityToken) -> IdentitySecret { identity.secret().expect("identity contained a secret") } -- cgit v1.2.3