From 5d3392799f88c5a3d3f9c656c73d6e8ac5c4d793 Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Wed, 2 Oct 2024 01:02:58 -0400 Subject: Split login and token handling. --- src/app.rs | 21 +++-- src/event/routes.rs | 5 +- src/event/routes/test.rs | 4 +- src/expire.rs | 2 +- src/login/app.rs | 169 ++------------------------------------- src/login/broadcaster.rs | 3 - src/login/mod.rs | 4 +- src/login/repo.rs | 50 ++++++++++++ src/login/repo/auth.rs | 50 ------------ src/login/repo/mod.rs | 1 - src/login/routes.rs | 10 +-- src/login/routes/test/login.rs | 13 ++- src/login/routes/test/logout.rs | 7 +- src/login/types.rs | 12 --- src/repo/login.rs | 50 ------------ src/repo/mod.rs | 2 - src/repo/token.rs | 151 ----------------------------------- src/test/fixtures/identity.rs | 4 +- src/token/app.rs | 170 ++++++++++++++++++++++++++++++++++++++++ src/token/broadcaster.rs | 4 + src/token/event.rs | 12 +++ src/token/extract/identity.rs | 6 +- src/token/mod.rs | 4 + src/token/repo/auth.rs | 50 ++++++++++++ src/token/repo/mod.rs | 4 + src/token/repo/token.rs | 151 +++++++++++++++++++++++++++++++++++ 26 files changed, 486 insertions(+), 473 deletions(-) delete mode 100644 src/login/broadcaster.rs create mode 100644 src/login/repo.rs delete mode 100644 src/login/repo/auth.rs delete mode 100644 src/login/repo/mod.rs delete mode 100644 src/login/types.rs delete mode 100644 src/repo/login.rs delete mode 100644 src/repo/token.rs create mode 100644 src/token/app.rs create mode 100644 src/token/broadcaster.rs create mode 100644 src/token/event.rs create mode 100644 src/token/repo/auth.rs create mode 100644 src/token/repo/mod.rs create mode 100644 src/token/repo/token.rs (limited to 'src') diff --git a/src/app.rs b/src/app.rs index 84a6357..5542e5f 100644 --- a/src/app.rs +++ b/src/app.rs @@ -3,34 +3,39 @@ use sqlx::sqlite::SqlitePool; use crate::{ channel::app::Channels, event::{app::Events, broadcaster::Broadcaster as EventBroadcaster}, - login::{app::Logins, broadcaster::Broadcaster as LoginBroadcaster}, + login::app::Logins, + token::{app::Tokens, broadcaster::Broadcaster as TokenBroadcaster}, }; #[derive(Clone)] pub struct App { db: SqlitePool, events: EventBroadcaster, - logins: LoginBroadcaster, + tokens: TokenBroadcaster, } impl App { pub fn from(db: SqlitePool) -> Self { let events = EventBroadcaster::default(); - let logins = LoginBroadcaster::default(); - Self { db, events, logins } + let tokens = TokenBroadcaster::default(); + Self { db, events, tokens } } } impl App { - pub const fn logins(&self) -> Logins { - Logins::new(&self.db, &self.logins) + pub const fn channels(&self) -> Channels { + Channels::new(&self.db, &self.events) } pub const fn events(&self) -> Events { Events::new(&self.db, &self.events) } - pub const fn channels(&self) -> Channels { - Channels::new(&self.db, &self.events) + pub const fn logins(&self) -> Logins { + Logins::new(&self.db) + } + + pub const fn tokens(&self) -> Tokens { + Tokens::new(&self.db, &self.tokens) } } diff --git a/src/event/routes.rs b/src/event/routes.rs index 77761ca..50ac435 100644 --- a/src/event/routes.rs +++ b/src/event/routes.rs @@ -15,8 +15,7 @@ use crate::{ app::App, error::{Internal, Unauthorized}, event::Sequence, - login::app::ValidateError, - token::extract::Identity, + token::{app::ValidateError, extract::Identity}, }; #[cfg(test)] @@ -42,7 +41,7 @@ async fn events( .or(query.resume_point); let stream = app.events().subscribe(resume_at).await?; - let stream = app.logins().limit_stream(identity.token, stream).await?; + let stream = app.tokens().limit_stream(identity.token, stream).await?; Ok(Events(stream)) } diff --git a/src/event/routes/test.rs b/src/event/routes/test.rs index 9a3b12a..d1ac3b4 100644 --- a/src/event/routes/test.rs +++ b/src/event/routes/test.rs @@ -371,7 +371,7 @@ async fn terminates_on_token_expiry() { // Verify the resulting stream's behaviour - app.logins() + app.tokens() .expire(&fixtures::now()) .await .expect("expiring tokens succeeds"); @@ -418,7 +418,7 @@ async fn terminates_on_logout() { // Verify the resulting stream's behaviour - app.logins() + app.tokens() .logout(&subscriber.token) .await .expect("expiring tokens succeeds"); diff --git a/src/expire.rs b/src/expire.rs index 16006d1..a8eb8ad 100644 --- a/src/expire.rs +++ b/src/expire.rs @@ -13,7 +13,7 @@ pub async fn middleware( req: Request, next: Next, ) -> Result { - app.logins().expire(&expired_at).await?; + app.tokens().expire(&expired_at).await?; app.events().expire(&expired_at).await?; app.channels().expire(&expired_at).await?; Ok(next.run(req).await) diff --git a/src/login/app.rs b/src/login/app.rs index 60475af..69c1055 100644 --- a/src/login/app.rs +++ b/src/login/app.rs @@ -1,30 +1,17 @@ -use chrono::TimeDelta; -use futures::{ - future, - stream::{self, StreamExt as _}, - Stream, -}; use sqlx::sqlite::SqlitePool; -use super::{broadcaster::Broadcaster, repo::auth::Provider as _, types, Login}; -use crate::{ - clock::DateTime, - event::Sequence, - login::Password, - repo::{ - error::NotFound as _, login::Provider as _, sequence::Provider as _, token::Provider as _, - }, - token::{self, Secret}, -}; +use crate::{event::Sequence, repo::sequence::Provider as _}; + +#[cfg(test)] +use super::{repo::Provider as _, Login, Password}; pub struct Logins<'a> { db: &'a SqlitePool, - logins: &'a Broadcaster, } impl<'a> Logins<'a> { - pub const fn new(db: &'a SqlitePool, logins: &'a Broadcaster) -> Self { - Self { db, logins } + pub const fn new(db: &'a SqlitePool) -> Self { + Self { db } } pub async fn boot_point(&self) -> Result { @@ -35,33 +22,6 @@ impl<'a> Logins<'a> { Ok(sequence) } - pub async fn login( - &self, - name: &str, - password: &Password, - login_at: &DateTime, - ) -> Result { - let mut tx = self.db.begin().await?; - - let login = if let Some((login, stored_hash)) = tx.auth().for_name(name).await? { - if stored_hash.verify(password)? { - // Password verified; use the login. - login - } else { - // Password NOT verified. - return Err(LoginError::Rejected); - } - } else { - let password_hash = password.hash()?; - tx.logins().create(name, &password_hash).await? - }; - - let token = tx.tokens().issue(&login, login_at).await?; - tx.commit().await?; - - Ok(token) - } - #[cfg(test)] pub async fn create(&self, name: &str, password: &Password) -> Result { let password_hash = password.hash()?; @@ -72,109 +32,6 @@ impl<'a> Logins<'a> { Ok(login) } - - pub async fn validate( - &self, - secret: &Secret, - used_at: &DateTime, - ) -> Result<(token::Id, Login), ValidateError> { - let mut tx = self.db.begin().await?; - let login = tx - .tokens() - .validate(secret, used_at) - .await - .not_found(|| ValidateError::InvalidToken)?; - tx.commit().await?; - - Ok(login) - } - - pub async fn limit_stream( - &self, - token: token::Id, - events: impl Stream + std::fmt::Debug, - ) -> Result + std::fmt::Debug, ValidateError> - where - E: std::fmt::Debug, - { - // Subscribe, first. - let token_events = self.logins.subscribe(); - - // Check that the token is valid at this point in time, second. If it is, then - // any future revocations will appear in the subscription. If not, bail now. - // - // It's possible, otherwise, to get to this point with a token that _was_ valid - // at the start of the request, but which was invalided _before_ the - // `subscribe()` call. In that case, the corresponding revocation event will - // simply be missed, since the `token_events` stream subscribed after the fact. - // This check cancels guarding the stream here. - // - // Yes, this is a weird niche edge case. Most things don't double-check, because - // they aren't expected to run long enough for the token's revocation to - // matter. Supervising a stream, on the other hand, will run for a - // _long_ time; if we miss the race here, we'll never actually carry out the - // supervision. - let mut tx = self.db.begin().await?; - tx.tokens() - .require(&token) - .await - .not_found(|| ValidateError::InvalidToken)?; - tx.commit().await?; - - // Then construct the guarded stream. First, project both streams into - // `GuardedEvent`. - let token_events = token_events - .filter(move |event| future::ready(event.token == token)) - .map(|_| GuardedEvent::TokenRevoked); - let events = events.map(|event| GuardedEvent::Event(event)); - - // Merge the two streams, then unproject them, stopping at - // `GuardedEvent::TokenRevoked`. - let stream = stream::select(token_events, events).scan((), |(), event| { - future::ready(match event { - GuardedEvent::Event(event) => Some(event), - GuardedEvent::TokenRevoked => None, - }) - }); - - Ok(stream) - } - - pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { - // Somewhat arbitrarily, expire after 7 days. - let expire_at = relative_to.to_owned() - TimeDelta::days(7); - - let mut tx = self.db.begin().await?; - let tokens = tx.tokens().expire(&expire_at).await?; - tx.commit().await?; - - for event in tokens.into_iter().map(types::TokenRevoked::from) { - self.logins.broadcast(&event); - } - - Ok(()) - } - - pub async fn logout(&self, token: &token::Id) -> Result<(), ValidateError> { - let mut tx = self.db.begin().await?; - tx.tokens().revoke(token).await?; - tx.commit().await?; - - self.logins - .broadcast(&types::TokenRevoked::from(token.clone())); - - Ok(()) - } -} - -#[derive(Debug, thiserror::Error)] -pub enum LoginError { - #[error("invalid login")] - Rejected, - #[error(transparent)] - DatabaseError(#[from] sqlx::Error), - #[error(transparent)] - PasswordHashError(#[from] password_hash::Error), } #[cfg(test)] @@ -184,17 +41,3 @@ pub enum CreateError { DatabaseError(#[from] sqlx::Error), PasswordHashError(#[from] password_hash::Error), } - -#[derive(Debug, thiserror::Error)] -pub enum ValidateError { - #[error("invalid token")] - InvalidToken, - #[error(transparent)] - DatabaseError(#[from] sqlx::Error), -} - -#[derive(Debug)] -enum GuardedEvent { - TokenRevoked, - Event(E), -} diff --git a/src/login/broadcaster.rs b/src/login/broadcaster.rs deleted file mode 100644 index 8e1fb3a..0000000 --- a/src/login/broadcaster.rs +++ /dev/null @@ -1,3 +0,0 @@ -use crate::{broadcast, login::types}; - -pub type Broadcaster = broadcast::Broadcaster; diff --git a/src/login/mod.rs b/src/login/mod.rs index 91c1821..65e3ada 100644 --- a/src/login/mod.rs +++ b/src/login/mod.rs @@ -1,11 +1,9 @@ pub mod app; -pub mod broadcaster; pub mod extract; mod id; pub mod password; -mod repo; +pub mod repo; mod routes; -pub mod types; pub use self::{id::Id, password::Password, routes::router}; diff --git a/src/login/repo.rs b/src/login/repo.rs new file mode 100644 index 0000000..d1a02c4 --- /dev/null +++ b/src/login/repo.rs @@ -0,0 +1,50 @@ +use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; + +use crate::login::{password::StoredHash, Id, Login}; + +pub trait Provider { + fn logins(&mut self) -> Logins; +} + +impl<'c> Provider for Transaction<'c, Sqlite> { + fn logins(&mut self) -> Logins { + Logins(self) + } +} + +pub struct Logins<'t>(&'t mut SqliteConnection); + +impl<'c> Logins<'c> { + pub async fn create( + &mut self, + name: &str, + password_hash: &StoredHash, + ) -> Result { + let id = Id::generate(); + + let login = sqlx::query_as!( + Login, + r#" + insert or fail + into login (id, name, password_hash) + values ($1, $2, $3) + returning + id as "id: Id", + name + "#, + id, + name, + password_hash, + ) + .fetch_one(&mut *self.0) + .await?; + + Ok(login) + } +} + +impl<'t> From<&'t mut SqliteConnection> for Logins<'t> { + fn from(tx: &'t mut SqliteConnection) -> Self { + Self(tx) + } +} diff --git a/src/login/repo/auth.rs b/src/login/repo/auth.rs deleted file mode 100644 index b299697..0000000 --- a/src/login/repo/auth.rs +++ /dev/null @@ -1,50 +0,0 @@ -use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; - -use crate::login::{self, password::StoredHash, Login}; - -pub trait Provider { - fn auth(&mut self) -> Auth; -} - -impl<'c> Provider for Transaction<'c, Sqlite> { - fn auth(&mut self) -> Auth { - Auth(self) - } -} - -pub struct Auth<'t>(&'t mut SqliteConnection); - -impl<'t> Auth<'t> { - // Retrieves a login by name, plus its stored password hash for - // verification. If there's no login with the requested name, this will - // return [None]. - pub async fn for_name( - &mut self, - name: &str, - ) -> Result, sqlx::Error> { - let found = sqlx::query!( - r#" - select - id as "id: login::Id", - name, - password_hash as "password_hash: StoredHash" - from login - where name = $1 - "#, - name, - ) - .map(|rec| { - ( - Login { - id: rec.id, - name: rec.name, - }, - rec.password_hash, - ) - }) - .fetch_optional(&mut *self.0) - .await?; - - Ok(found) - } -} diff --git a/src/login/repo/mod.rs b/src/login/repo/mod.rs deleted file mode 100644 index 0e4a05d..0000000 --- a/src/login/repo/mod.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod auth; diff --git a/src/login/routes.rs b/src/login/routes.rs index b571bd5..0874cc3 100644 --- a/src/login/routes.rs +++ b/src/login/routes.rs @@ -11,11 +11,9 @@ use crate::{ clock::RequestedAt, error::{Internal, Unauthorized}, login::{Login, Password}, + token::{app, extract::IdentityToken}, }; -use super::app; -use crate::token::extract::IdentityToken; - #[cfg(test)] mod test; @@ -59,7 +57,7 @@ async fn on_login( Json(request): Json, ) -> Result<(IdentityToken, StatusCode), LoginError> { let token = app - .logins() + .tokens() .login(&request.name, &request.password, &now) .await .map_err(LoginError)?; @@ -95,8 +93,8 @@ async fn on_logout( Json(LogoutRequest {}): Json, ) -> Result<(IdentityToken, StatusCode), LogoutError> { if let Some(secret) = identity.secret() { - let (token, _) = app.logins().validate(&secret, &now).await?; - app.logins().logout(&token).await?; + let (token, _) = app.tokens().validate(&secret, &now).await?; + app.tokens().logout(&token).await?; } let identity = identity.clear(); diff --git a/src/login/routes/test/login.rs b/src/login/routes/test/login.rs index 81653ff..3c82738 100644 --- a/src/login/routes/test/login.rs +++ b/src/login/routes/test/login.rs @@ -3,10 +3,7 @@ use axum::{ http::StatusCode, }; -use crate::{ - login::{app, routes}, - test::fixtures, -}; +use crate::{login::routes, test::fixtures, token::app}; #[tokio::test] async fn new_identity() { @@ -37,7 +34,7 @@ async fn new_identity() { let validated_at = fixtures::now(); let (_, validated) = app - .logins() + .tokens() .validate(&secret, &validated_at) .await .expect("identity secret is valid"); @@ -74,7 +71,7 @@ async fn existing_identity() { let validated_at = fixtures::now(); let (_, validated_login) = app - .logins() + .tokens() .validate(&secret, &validated_at) .await .expect("identity secret is valid"); @@ -127,14 +124,14 @@ async fn token_expires() { // Verify the semantics let expired_at = fixtures::now(); - app.logins() + app.tokens() .expire(&expired_at) .await .expect("expiring tokens never fails"); let verified_at = fixtures::now(); let error = app - .logins() + .tokens() .validate(&secret, &verified_at) .await .expect_err("validating an expired token"); diff --git a/src/login/routes/test/logout.rs b/src/login/routes/test/logout.rs index 20b0d55..42b2534 100644 --- a/src/login/routes/test/logout.rs +++ b/src/login/routes/test/logout.rs @@ -3,10 +3,7 @@ use axum::{ http::StatusCode, }; -use crate::{ - login::{app, routes}, - test::fixtures, -}; +use crate::{login::routes, test::fixtures, token::app}; #[tokio::test] async fn successful() { @@ -37,7 +34,7 @@ async fn successful() { // Verify the semantics let error = app - .logins() + .tokens() .validate(&secret, &now) .await .expect_err("secret is invalid"); diff --git a/src/login/types.rs b/src/login/types.rs deleted file mode 100644 index d53d436..0000000 --- a/src/login/types.rs +++ /dev/null @@ -1,12 +0,0 @@ -use crate::token; - -#[derive(Clone, Debug)] -pub struct TokenRevoked { - pub token: token::Id, -} - -impl From for TokenRevoked { - fn from(token: token::Id) -> Self { - Self { token } - } -} diff --git a/src/repo/login.rs b/src/repo/login.rs deleted file mode 100644 index d1a02c4..0000000 --- a/src/repo/login.rs +++ /dev/null @@ -1,50 +0,0 @@ -use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; - -use crate::login::{password::StoredHash, Id, Login}; - -pub trait Provider { - fn logins(&mut self) -> Logins; -} - -impl<'c> Provider for Transaction<'c, Sqlite> { - fn logins(&mut self) -> Logins { - Logins(self) - } -} - -pub struct Logins<'t>(&'t mut SqliteConnection); - -impl<'c> Logins<'c> { - pub async fn create( - &mut self, - name: &str, - password_hash: &StoredHash, - ) -> Result { - let id = Id::generate(); - - let login = sqlx::query_as!( - Login, - r#" - insert or fail - into login (id, name, password_hash) - values ($1, $2, $3) - returning - id as "id: Id", - name - "#, - id, - name, - password_hash, - ) - .fetch_one(&mut *self.0) - .await?; - - Ok(login) - } -} - -impl<'t> From<&'t mut SqliteConnection> for Logins<'t> { - fn from(tx: &'t mut SqliteConnection) -> Self { - Self(tx) - } -} diff --git a/src/repo/mod.rs b/src/repo/mod.rs index 69ad82c..7abd46b 100644 --- a/src/repo/mod.rs +++ b/src/repo/mod.rs @@ -1,6 +1,4 @@ pub mod channel; pub mod error; -pub mod login; pub mod pool; pub mod sequence; -pub mod token; diff --git a/src/repo/token.rs b/src/repo/token.rs deleted file mode 100644 index 5f64dac..0000000 --- a/src/repo/token.rs +++ /dev/null @@ -1,151 +0,0 @@ -use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; -use uuid::Uuid; - -use crate::{ - clock::DateTime, - login::{self, Login}, - token::{Id, Secret}, -}; - -pub trait Provider { - fn tokens(&mut self) -> Tokens; -} - -impl<'c> Provider for Transaction<'c, Sqlite> { - fn tokens(&mut self) -> Tokens { - Tokens(self) - } -} - -pub struct Tokens<'t>(&'t mut SqliteConnection); - -impl<'c> Tokens<'c> { - // Issue a new token for an existing login. The issued_at timestamp will - // be used to control expiry, until the token is actually used. - pub async fn issue( - &mut self, - login: &Login, - issued_at: &DateTime, - ) -> Result { - let id = Id::generate(); - let secret = Uuid::new_v4().to_string(); - - let secret = sqlx::query_scalar!( - r#" - insert - into token (id, secret, login, issued_at, last_used_at) - values ($1, $2, $3, $4, $4) - returning secret as "secret!: Secret" - "#, - id, - secret, - login.id, - issued_at, - ) - .fetch_one(&mut *self.0) - .await?; - - Ok(secret) - } - - pub async fn require(&mut self, token: &Id) -> Result<(), sqlx::Error> { - sqlx::query_scalar!( - r#" - select id as "id: Id" - from token - where id = $1 - "#, - token, - ) - .fetch_one(&mut *self.0) - .await?; - - Ok(()) - } - - // Revoke a token by its secret. - pub async fn revoke(&mut self, token: &Id) -> Result<(), sqlx::Error> { - sqlx::query_scalar!( - r#" - delete - from token - where id = $1 - returning id as "id: Id" - "#, - token, - ) - .fetch_one(&mut *self.0) - .await?; - - Ok(()) - } - - // Expire and delete all tokens that haven't been used more recently than - // `expire_at`. - pub async fn expire(&mut self, expire_at: &DateTime) -> Result, sqlx::Error> { - let tokens = sqlx::query_scalar!( - r#" - delete - from token - where last_used_at < $1 - returning id as "id: Id" - "#, - expire_at, - ) - .fetch_all(&mut *self.0) - .await?; - - Ok(tokens) - } - - // Validate a token by its secret, retrieving the associated Login record. - // Will return [None] if the token is not valid. The token's last-used - // timestamp will be set to `used_at`. - pub async fn validate( - &mut self, - secret: &Secret, - used_at: &DateTime, - ) -> Result<(Id, Login), sqlx::Error> { - // I would use `update … returning` to do this in one query, but - // sqlite3, as of this writing, does not allow an update's `returning` - // clause to reference columns from tables joined into the update. Two - // queries is fine, but it feels untidy. - sqlx::query!( - r#" - update token - set last_used_at = $1 - where secret = $2 - "#, - used_at, - secret, - ) - .execute(&mut *self.0) - .await?; - - let login = sqlx::query!( - r#" - select - token.id as "token_id: Id", - login.id as "login_id: login::Id", - name as "login_name" - from login - join token on login.id = token.login - where token.secret = $1 - "#, - secret, - ) - .map(|row| { - ( - row.token_id, - Login { - id: row.login_id, - name: row.login_name, - }, - ) - }) - .fetch_one(&mut *self.0) - .await?; - - Ok(login) - } -} diff --git a/src/test/fixtures/identity.rs b/src/test/fixtures/identity.rs index 9e8e403..56b4ffa 100644 --- a/src/test/fixtures/identity.rs +++ b/src/test/fixtures/identity.rs @@ -17,7 +17,7 @@ pub fn not_logged_in() -> IdentityToken { pub async fn logged_in(app: &App, login: &(String, Password), now: &RequestedAt) -> IdentityToken { let (name, password) = login; let token = app - .logins() + .tokens() .login(name, password, now) .await .expect("should succeed given known-valid credentials"); @@ -28,7 +28,7 @@ pub async fn logged_in(app: &App, login: &(String, Password), now: &RequestedAt) pub async fn from_token(app: &App, token: &IdentityToken, issued_at: &RequestedAt) -> Identity { let secret = token.secret().expect("identity token has a secret"); let (token, login) = app - .logins() + .tokens() .validate(&secret, issued_at) .await .expect("always validates newly-issued secret"); diff --git a/src/token/app.rs b/src/token/app.rs new file mode 100644 index 0000000..1477a9f --- /dev/null +++ b/src/token/app.rs @@ -0,0 +1,170 @@ +use chrono::TimeDelta; +use futures::{ + future, + stream::{self, StreamExt as _}, + Stream, +}; +use sqlx::sqlite::SqlitePool; + +use super::{ + broadcaster::Broadcaster, event, repo::auth::Provider as _, repo::Provider as _, Id, Secret, +}; +use crate::{ + clock::DateTime, + login::{repo::Provider as _, Login, Password}, + repo::error::NotFound as _, +}; + +pub struct Tokens<'a> { + db: &'a SqlitePool, + tokens: &'a Broadcaster, +} + +impl<'a> Tokens<'a> { + pub const fn new(db: &'a SqlitePool, tokens: &'a Broadcaster) -> Self { + Self { db, tokens } + } + pub async fn login( + &self, + name: &str, + password: &Password, + login_at: &DateTime, + ) -> Result { + let mut tx = self.db.begin().await?; + + let login = if let Some((login, stored_hash)) = tx.auth().for_name(name).await? { + if stored_hash.verify(password)? { + // Password verified; use the login. + login + } else { + // Password NOT verified. + return Err(LoginError::Rejected); + } + } else { + let password_hash = password.hash()?; + tx.logins().create(name, &password_hash).await? + }; + + let token = tx.tokens().issue(&login, login_at).await?; + tx.commit().await?; + + Ok(token) + } + + pub async fn validate( + &self, + secret: &Secret, + used_at: &DateTime, + ) -> Result<(Id, Login), ValidateError> { + let mut tx = self.db.begin().await?; + let login = tx + .tokens() + .validate(secret, used_at) + .await + .not_found(|| ValidateError::InvalidToken)?; + tx.commit().await?; + + Ok(login) + } + + pub async fn limit_stream( + &self, + token: Id, + events: impl Stream + std::fmt::Debug, + ) -> Result + std::fmt::Debug, ValidateError> + where + E: std::fmt::Debug, + { + // Subscribe, first. + let token_events = self.tokens.subscribe(); + + // Check that the token is valid at this point in time, second. If it is, then + // any future revocations will appear in the subscription. If not, bail now. + // + // It's possible, otherwise, to get to this point with a token that _was_ valid + // at the start of the request, but which was invalided _before_ the + // `subscribe()` call. In that case, the corresponding revocation event will + // simply be missed, since the `token_events` stream subscribed after the fact. + // This check cancels guarding the stream here. + // + // Yes, this is a weird niche edge case. Most things don't double-check, because + // they aren't expected to run long enough for the token's revocation to + // matter. Supervising a stream, on the other hand, will run for a + // _long_ time; if we miss the race here, we'll never actually carry out the + // supervision. + let mut tx = self.db.begin().await?; + tx.tokens() + .require(&token) + .await + .not_found(|| ValidateError::InvalidToken)?; + tx.commit().await?; + + // Then construct the guarded stream. First, project both streams into + // `GuardedEvent`. + let token_events = token_events + .filter(move |event| future::ready(event.token == token)) + .map(|_| GuardedEvent::TokenRevoked); + let events = events.map(|event| GuardedEvent::Event(event)); + + // Merge the two streams, then unproject them, stopping at + // `GuardedEvent::TokenRevoked`. + let stream = stream::select(token_events, events).scan((), |(), event| { + future::ready(match event { + GuardedEvent::Event(event) => Some(event), + GuardedEvent::TokenRevoked => None, + }) + }); + + Ok(stream) + } + + pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { + // Somewhat arbitrarily, expire after 7 days. + let expire_at = relative_to.to_owned() - TimeDelta::days(7); + + let mut tx = self.db.begin().await?; + let tokens = tx.tokens().expire(&expire_at).await?; + tx.commit().await?; + + for event in tokens.into_iter().map(event::TokenRevoked::from) { + self.tokens.broadcast(&event); + } + + Ok(()) + } + + pub async fn logout(&self, token: &Id) -> Result<(), ValidateError> { + let mut tx = self.db.begin().await?; + tx.tokens().revoke(token).await?; + tx.commit().await?; + + self.tokens + .broadcast(&event::TokenRevoked::from(token.clone())); + + Ok(()) + } +} + +#[derive(Debug, thiserror::Error)] +pub enum LoginError { + #[error("invalid login")] + Rejected, + #[error(transparent)] + DatabaseError(#[from] sqlx::Error), + #[error(transparent)] + PasswordHashError(#[from] password_hash::Error), +} + +#[derive(Debug, thiserror::Error)] +pub enum ValidateError { + #[error("invalid token")] + InvalidToken, + #[error(transparent)] + DatabaseError(#[from] sqlx::Error), +} + +#[derive(Debug)] +enum GuardedEvent { + TokenRevoked, + Event(E), +} diff --git a/src/token/broadcaster.rs b/src/token/broadcaster.rs new file mode 100644 index 0000000..8e2e006 --- /dev/null +++ b/src/token/broadcaster.rs @@ -0,0 +1,4 @@ +use super::event; +use crate::broadcast; + +pub type Broadcaster = broadcast::Broadcaster; diff --git a/src/token/event.rs b/src/token/event.rs new file mode 100644 index 0000000..d53d436 --- /dev/null +++ b/src/token/event.rs @@ -0,0 +1,12 @@ +use crate::token; + +#[derive(Clone, Debug)] +pub struct TokenRevoked { + pub token: token::Id, +} + +impl From for TokenRevoked { + fn from(token: token::Id) -> Self { + Self { token } + } +} diff --git a/src/token/extract/identity.rs b/src/token/extract/identity.rs index 42c7c60..60ad220 100644 --- a/src/token/extract/identity.rs +++ b/src/token/extract/identity.rs @@ -10,8 +10,8 @@ use crate::{ app::App, clock::RequestedAt, error::{Internal, Unauthorized}, - login::{app::ValidateError, Login}, - token, + login::Login, + token::{self, app::ValidateError}, }; #[derive(Clone, Debug)] @@ -40,7 +40,7 @@ impl FromRequestParts for Identity { let secret = identity_token.secret().ok_or(LoginError::Unauthorized)?; let app = State::::from_request_parts(parts, state).await?; - match app.logins().validate(&secret, &used_at).await { + match app.tokens().validate(&secret, &used_at).await { Ok((token, login)) => Ok(Identity { token, login }), Err(ValidateError::InvalidToken) => Err(LoginError::Unauthorized), Err(other) => Err(other.into()), diff --git a/src/token/mod.rs b/src/token/mod.rs index c98b8c2..d122611 100644 --- a/src/token/mod.rs +++ b/src/token/mod.rs @@ -1,5 +1,9 @@ +pub mod app; +pub mod broadcaster; +mod event; pub mod extract; mod id; +mod repo; mod secret; pub use self::{id::Id, secret::Secret}; diff --git a/src/token/repo/auth.rs b/src/token/repo/auth.rs new file mode 100644 index 0000000..b299697 --- /dev/null +++ b/src/token/repo/auth.rs @@ -0,0 +1,50 @@ +use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; + +use crate::login::{self, password::StoredHash, Login}; + +pub trait Provider { + fn auth(&mut self) -> Auth; +} + +impl<'c> Provider for Transaction<'c, Sqlite> { + fn auth(&mut self) -> Auth { + Auth(self) + } +} + +pub struct Auth<'t>(&'t mut SqliteConnection); + +impl<'t> Auth<'t> { + // Retrieves a login by name, plus its stored password hash for + // verification. If there's no login with the requested name, this will + // return [None]. + pub async fn for_name( + &mut self, + name: &str, + ) -> Result, sqlx::Error> { + let found = sqlx::query!( + r#" + select + id as "id: login::Id", + name, + password_hash as "password_hash: StoredHash" + from login + where name = $1 + "#, + name, + ) + .map(|rec| { + ( + Login { + id: rec.id, + name: rec.name, + }, + rec.password_hash, + ) + }) + .fetch_optional(&mut *self.0) + .await?; + + Ok(found) + } +} diff --git a/src/token/repo/mod.rs b/src/token/repo/mod.rs new file mode 100644 index 0000000..9169743 --- /dev/null +++ b/src/token/repo/mod.rs @@ -0,0 +1,4 @@ +pub mod auth; +mod token; + +pub use self::token::Provider; diff --git a/src/token/repo/token.rs b/src/token/repo/token.rs new file mode 100644 index 0000000..5f64dac --- /dev/null +++ b/src/token/repo/token.rs @@ -0,0 +1,151 @@ +use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; +use uuid::Uuid; + +use crate::{ + clock::DateTime, + login::{self, Login}, + token::{Id, Secret}, +}; + +pub trait Provider { + fn tokens(&mut self) -> Tokens; +} + +impl<'c> Provider for Transaction<'c, Sqlite> { + fn tokens(&mut self) -> Tokens { + Tokens(self) + } +} + +pub struct Tokens<'t>(&'t mut SqliteConnection); + +impl<'c> Tokens<'c> { + // Issue a new token for an existing login. The issued_at timestamp will + // be used to control expiry, until the token is actually used. + pub async fn issue( + &mut self, + login: &Login, + issued_at: &DateTime, + ) -> Result { + let id = Id::generate(); + let secret = Uuid::new_v4().to_string(); + + let secret = sqlx::query_scalar!( + r#" + insert + into token (id, secret, login, issued_at, last_used_at) + values ($1, $2, $3, $4, $4) + returning secret as "secret!: Secret" + "#, + id, + secret, + login.id, + issued_at, + ) + .fetch_one(&mut *self.0) + .await?; + + Ok(secret) + } + + pub async fn require(&mut self, token: &Id) -> Result<(), sqlx::Error> { + sqlx::query_scalar!( + r#" + select id as "id: Id" + from token + where id = $1 + "#, + token, + ) + .fetch_one(&mut *self.0) + .await?; + + Ok(()) + } + + // Revoke a token by its secret. + pub async fn revoke(&mut self, token: &Id) -> Result<(), sqlx::Error> { + sqlx::query_scalar!( + r#" + delete + from token + where id = $1 + returning id as "id: Id" + "#, + token, + ) + .fetch_one(&mut *self.0) + .await?; + + Ok(()) + } + + // Expire and delete all tokens that haven't been used more recently than + // `expire_at`. + pub async fn expire(&mut self, expire_at: &DateTime) -> Result, sqlx::Error> { + let tokens = sqlx::query_scalar!( + r#" + delete + from token + where last_used_at < $1 + returning id as "id: Id" + "#, + expire_at, + ) + .fetch_all(&mut *self.0) + .await?; + + Ok(tokens) + } + + // Validate a token by its secret, retrieving the associated Login record. + // Will return [None] if the token is not valid. The token's last-used + // timestamp will be set to `used_at`. + pub async fn validate( + &mut self, + secret: &Secret, + used_at: &DateTime, + ) -> Result<(Id, Login), sqlx::Error> { + // I would use `update … returning` to do this in one query, but + // sqlite3, as of this writing, does not allow an update's `returning` + // clause to reference columns from tables joined into the update. Two + // queries is fine, but it feels untidy. + sqlx::query!( + r#" + update token + set last_used_at = $1 + where secret = $2 + "#, + used_at, + secret, + ) + .execute(&mut *self.0) + .await?; + + let login = sqlx::query!( + r#" + select + token.id as "token_id: Id", + login.id as "login_id: login::Id", + name as "login_name" + from login + join token on login.id = token.login + where token.secret = $1 + "#, + secret, + ) + .map(|row| { + ( + row.token_id, + Login { + id: row.login_id, + name: row.login_name, + }, + ) + }) + .fetch_one(&mut *self.0) + .await?; + + Ok(login) + } +} -- cgit v1.2.3