use chrono::TimeDelta; use futures::{ future, stream::{self, StreamExt as _}, Stream, }; use sqlx::sqlite::SqlitePool; use super::{broadcaster::Broadcaster, extract::IdentitySecret, repo::auth::Provider as _, types}; use crate::{ clock::DateTime, password::Password, repo::{ error::NotFound as _, login::{Login, Provider as _}, token::{self, Provider as _}, }, }; pub struct Logins<'a> { db: &'a SqlitePool, logins: &'a Broadcaster, } impl<'a> Logins<'a> { pub const fn new(db: &'a SqlitePool, logins: &'a Broadcaster) -> Self { Self { db, logins } } pub async fn login( &self, name: &str, password: &Password, login_at: &DateTime, ) -> Result { let mut tx = self.db.begin().await?; let login = if let Some((login, stored_hash)) = tx.auth().for_name(name).await? { if stored_hash.verify(password)? { // Password verified; use the login. login } else { // Password NOT verified. return Err(LoginError::Rejected); } } else { let password_hash = password.hash()?; tx.logins().create(name, &password_hash).await? }; let token = tx.tokens().issue(&login, login_at).await?; tx.commit().await?; Ok(token) } #[cfg(test)] pub async fn create(&self, name: &str, password: &Password) -> Result { let password_hash = password.hash()?; let mut tx = self.db.begin().await?; let login = tx.logins().create(name, &password_hash).await?; tx.commit().await?; Ok(login) } pub async fn validate( &self, secret: &IdentitySecret, used_at: &DateTime, ) -> Result<(token::Id, Login), ValidateError> { let mut tx = self.db.begin().await?; let login = tx .tokens() .validate(secret, used_at) .await .not_found(|| ValidateError::InvalidToken)?; tx.commit().await?; Ok(login) } pub async fn limit_stream( &self, token: token::Id, events: impl Stream + std::fmt::Debug, ) -> Result + std::fmt::Debug, ValidateError> where E: std::fmt::Debug, { // Subscribe, first. let token_events = self.logins.subscribe(); // Check that the token is valid at this point in time, second. If it is, then // any future revocations will appear in the subscription. If not, bail now. // // It's possible, otherwise, to get to this point with a token that _was_ valid // at the start of the request, but which was invalided _before_ the // `subscribe()` call. In that case, the corresponding revocation event will // simply be missed, since the `token_events` stream subscribed after the fact. // This check cancels guarding the stream here. // // Yes, this is a weird niche edge case. Most things don't double-check, because // they aren't expected to run long enough for the token's revocation to // matter. Supervising a stream, on the other hand, will run for a // _long_ time; if we miss the race here, we'll never actually carry out the // supervision. let mut tx = self.db.begin().await?; tx.tokens() .require(&token) .await .not_found(|| ValidateError::InvalidToken)?; tx.commit().await?; // Then construct the guarded stream. First, project both streams into // `GuardedEvent`. let token_events = token_events .filter(move |event| future::ready(event.token == token)) .map(|_| GuardedEvent::TokenRevoked); let events = events.map(|event| GuardedEvent::Event(event)); // Merge the two streams, then unproject them, stopping at // `GuardedEvent::TokenRevoked`. let stream = stream::select(token_events, events).scan((), |(), event| { future::ready(match event { GuardedEvent::Event(event) => Some(event), GuardedEvent::TokenRevoked => None, }) }); Ok(stream) } pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { // Somewhat arbitrarily, expire after 7 days. let expire_at = relative_to.to_owned() - TimeDelta::days(7); let mut tx = self.db.begin().await?; let tokens = tx.tokens().expire(&expire_at).await?; tx.commit().await?; for event in tokens.into_iter().map(types::TokenRevoked::from) { self.logins.broadcast(&event); } Ok(()) } pub async fn logout(&self, token: &token::Id) -> Result<(), ValidateError> { let mut tx = self.db.begin().await?; tx.tokens().revoke(token).await?; tx.commit().await?; self.logins .broadcast(&types::TokenRevoked::from(token.clone())); Ok(()) } } #[derive(Debug, thiserror::Error)] pub enum LoginError { #[error("invalid login")] Rejected, #[error(transparent)] DatabaseError(#[from] sqlx::Error), #[error(transparent)] PasswordHashError(#[from] password_hash::Error), } #[cfg(test)] #[derive(Debug, thiserror::Error)] #[error(transparent)] pub enum CreateError { DatabaseError(#[from] sqlx::Error), PasswordHashError(#[from] password_hash::Error), } #[derive(Debug, thiserror::Error)] pub enum ValidateError { #[error("invalid token")] InvalidToken, #[error(transparent)] DatabaseError(#[from] sqlx::Error), } #[derive(Debug)] enum GuardedEvent { TokenRevoked, Event(E), }