use chrono::TimeDelta; use futures::{ Stream, future, stream::{self, StreamExt as _}, }; use sqlx::sqlite::SqlitePool; use super::{ Broadcaster, Event as TokenEvent, Id, Secret, repo::{self, Provider as _, auth::Provider as _}, }; use crate::{ clock::DateTime, db::NotFound as _, name::{self, Name}, user::{Password, User, repo::Provider as _}, }; pub struct Tokens<'a> { db: &'a SqlitePool, token_events: &'a Broadcaster, } impl<'a> Tokens<'a> { pub const fn new(db: &'a SqlitePool, token_events: &'a Broadcaster) -> Self { Self { db, token_events } } pub async fn login( &self, name: &Name, password: &Password, login_at: &DateTime, ) -> Result<(User, Secret), LoginError> { let mut tx = self.db.begin().await?; let (user, stored_hash) = tx .auth() .for_name(name) .await .optional()? .ok_or(LoginError::Rejected)?; // Split the transaction here to avoid holding the tx open (potentially blocking // other writes) while we do the fairly expensive task of verifying the // password. It's okay if the token issuance transaction happens some notional // amount of time after retrieving the login, as inserting the token will fail // if the account is deleted during that time. tx.commit().await?; let snapshot = user.as_snapshot().ok_or(LoginError::Rejected)?; let token = if stored_hash.verify(password)? { let mut tx = self.db.begin().await?; let token = tx.tokens().issue(&user, login_at).await?; tx.commit().await?; token } else { Err(LoginError::Rejected)? }; Ok((snapshot, token)) } pub async fn change_password( &self, user: &User, password: &Password, to: &Password, changed_at: &DateTime, ) -> Result<(User, Secret), LoginError> { let mut tx = self.db.begin().await?; let (user, stored_hash) = tx .auth() .for_user(user) .await .optional()? .ok_or(LoginError::Rejected)?; // Split the transaction here to avoid holding the tx open (potentially blocking // other writes) while we do the fairly expensive task of verifying the // password. It's okay if the token issuance transaction happens some notional // amount of time after retrieving the login, as inserting the token will fail // if the account is deleted during that time. tx.commit().await?; if !stored_hash.verify(password)? { return Err(LoginError::Rejected); } let snapshot = user.as_snapshot().ok_or(LoginError::Rejected)?; let to_hash = to.hash()?; let mut tx = self.db.begin().await?; let tokens = tx.tokens().revoke_all(&user).await?; tx.users().set_password(&user, &to_hash).await?; let secret = tx.tokens().issue(&user, changed_at).await?; tx.commit().await?; for event in tokens.into_iter().map(TokenEvent::Revoked) { self.token_events.broadcast(event); } Ok((snapshot, secret)) } pub async fn validate( &self, secret: &Secret, used_at: &DateTime, ) -> Result<(Id, User), ValidateError> { let mut tx = self.db.begin().await?; let (token, user) = tx .tokens() .validate(secret, used_at) .await .not_found(|| ValidateError::InvalidToken)?; tx.commit().await?; let user = user.as_snapshot().ok_or(ValidateError::LoginDeleted)?; Ok((token, user)) } pub async fn limit_stream( &self, token: Id, events: S, ) -> Result + std::fmt::Debug + use, ValidateError> where S: Stream + std::fmt::Debug, E: std::fmt::Debug, { // Subscribe, first. let token_events = self.token_events.subscribe(); // Check that the token is valid at this point in time, second. If it is, then // any future revocations will appear in the subscription. If not, bail now. // // It's possible, otherwise, to get to this point with a token that _was_ valid // at the start of the request, but which was invalided _before_ the // `subscribe()` call. In that case, the corresponding revocation event will // simply be missed, since the `token_events` stream subscribed after the fact. // This check cancels guarding the stream here. // // Yes, this is a weird niche edge case. Most things don't double-check, because // they aren't expected to run long enough for the token's revocation to // matter. Supervising a stream, on the other hand, will run for a // _long_ time; if we miss the race here, we'll never actually carry out the // supervision. let mut tx = self.db.begin().await?; tx.tokens() .require(&token) .await .not_found(|| ValidateError::InvalidToken)?; tx.commit().await?; // Then construct the guarded stream. First, project both streams into // `GuardedEvent`. let token_events = token_events .filter(move |event| { future::ready(matches!(event, TokenEvent::Revoked(id) if id == &token)) }) .map(|_| GuardedEvent::TokenRevoked); let events = events.map(|event| GuardedEvent::Event(event)); // Merge the two streams, then unproject them, stopping at // `GuardedEvent::TokenRevoked`. let stream = stream::select(token_events, events).scan((), |(), event| { future::ready(match event { GuardedEvent::Event(event) => Some(event), GuardedEvent::TokenRevoked => None, }) }); Ok(stream) } pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { // Somewhat arbitrarily, expire after 7 days. let expire_at = relative_to.to_owned() - TimeDelta::days(7); let mut tx = self.db.begin().await?; let tokens = tx.tokens().expire(&expire_at).await?; tx.commit().await?; for event in tokens.into_iter().map(TokenEvent::Revoked) { self.token_events.broadcast(event); } Ok(()) } pub async fn logout(&self, token: &Id) -> Result<(), ValidateError> { let mut tx = self.db.begin().await?; tx.tokens().revoke(token).await?; tx.commit().await?; self.token_events .broadcast(TokenEvent::Revoked(token.clone())); Ok(()) } } #[derive(Debug, thiserror::Error)] pub enum LoginError { #[error("invalid login")] Rejected, #[error(transparent)] Database(#[from] sqlx::Error), #[error(transparent)] Name(#[from] name::Error), #[error(transparent)] PasswordHash(#[from] password_hash::Error), } impl From for LoginError { fn from(error: repo::auth::LoadError) -> Self { use repo::auth::LoadError; match error { LoadError::Database(error) => error.into(), LoadError::Name(error) => error.into(), } } } #[derive(Debug, thiserror::Error)] pub enum ValidateError { #[error("invalid token")] InvalidToken, #[error("user deleted")] LoginDeleted, #[error(transparent)] Database(#[from] sqlx::Error), #[error(transparent)] Name(#[from] name::Error), } impl From for ValidateError { fn from(error: repo::LoadError) -> Self { match error { repo::LoadError::Database(error) => error.into(), repo::LoadError::Name(error) => error.into(), } } } #[derive(Debug)] enum GuardedEvent { TokenRevoked, Event(E), }