use chrono::TimeDelta; use futures::{ Stream, future, stream::{self, StreamExt as _}, }; use sqlx::sqlite::SqlitePool; use super::{ Broadcaster, Event as TokenEvent, Secret, Token, extract::Identity, repo::Provider as _, }; use crate::{ clock::DateTime, db, db::NotFound as _, error::failed::{Failed, ResultExt as _}, push::repo::Provider as _, }; pub struct Tokens { db: SqlitePool, token_events: Broadcaster, } impl Tokens { pub const fn new(db: SqlitePool, token_events: Broadcaster) -> Self { Self { db, token_events } } pub async fn validate( &self, secret: &Secret, used_at: &DateTime, ) -> Result { let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?; let (token, login) = tx .tokens() .validate(secret, used_at) .await .optional() .fail("Failed to load token")? .ok_or(ValidateError::InvalidToken)?; tx.commit().await.fail(db::failed::COMMIT)?; Ok(Identity { token, login }) } pub async fn limit_stream( &self, token: &Token, events: S, ) -> Result + std::fmt::Debug + use, ValidateError> where S: Stream + std::fmt::Debug, E: std::fmt::Debug, { let token = token.id.clone(); // Subscribe, first. let token_events = self.token_events.subscribe(); // Check that the token is valid at this point in time, second. If it is, then // any future revocations will appear in the subscription. If not, bail now. // // It's possible, otherwise, to get to this point with a token that _was_ valid // at the start of the request, but which was invalided _before_ the // `subscribe()` call. In that case, the corresponding revocation event will // simply be missed, since the `token_events` stream subscribed after the fact. // This check cancels guarding the stream here. // // Yes, this is a weird niche edge case. Most things don't double-check, because // they aren't expected to run long enough for the token's revocation to // matter. Supervising a stream, on the other hand, will run for a // _long_ time; if we miss the race here, we'll never actually carry out the // supervision. let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?; tx.tokens() .require(&token) .await .optional() .fail("Failed to load token")? .ok_or(ValidateError::InvalidToken)?; tx.commit().await.fail(db::failed::COMMIT)?; // Then construct the guarded stream. First, project both streams into // `GuardedEvent`. let token_events = token_events .filter(move |event| { future::ready(matches!(event, TokenEvent::Revoked(id) if id == &token)) }) .map(|_| GuardedEvent::TokenRevoked); let events = events.map(|event| GuardedEvent::Event(event)); // Merge the two streams, then unproject them, stopping at // `GuardedEvent::TokenRevoked`. let stream = stream::select(token_events, events).scan((), |(), event| { future::ready(match event { GuardedEvent::Event(event) => Some(event), GuardedEvent::TokenRevoked => None, }) }); Ok(stream) } pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { // Somewhat arbitrarily, expire after 7 days. let expire_at = relative_to.to_owned() - TimeDelta::days(7); let mut tx = self.db.begin().await?; let tokens = tx.tokens().expire(&expire_at).await?; tx.commit().await?; tokens .into_iter() .map(TokenEvent::Revoked) .for_each(|event| self.token_events.broadcast(event)); Ok(()) } pub async fn logout(&self, token: &Token) -> Result<(), ValidateError> { let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?; tx.push() .unsubscribe_token(token) .await .fail("Failed to remove push subscriptions")?; tx.tokens() .revoke(token) .await .fail("Failed to revoke token")?; tx.commit().await.fail(db::failed::COMMIT)?; self.token_events .broadcast(TokenEvent::Revoked(token.id.clone())); Ok(()) } } #[derive(Debug, thiserror::Error)] pub enum ValidateError { #[error("invalid token")] InvalidToken, #[error(transparent)] Failed(#[from] Failed), } #[derive(Debug)] enum GuardedEvent { TokenRevoked, Event(E), }