diff options
Diffstat (limited to 'src/login/app.rs')
| -rw-r--r-- | src/login/app.rs | 58 |
1 files changed, 50 insertions, 8 deletions
diff --git a/src/login/app.rs b/src/login/app.rs index f7fec88..b8916a8 100644 --- a/src/login/app.rs +++ b/src/login/app.rs @@ -1,24 +1,30 @@ use chrono::TimeDelta; +use futures::{ + future, + stream::{self, StreamExt as _}, + Stream, +}; use sqlx::sqlite::SqlitePool; -use super::{extract::IdentitySecret, repo::auth::Provider as _}; +use super::{broadcaster::Broadcaster, extract::IdentitySecret, repo::auth::Provider as _, types}; use crate::{ clock::DateTime, password::Password, repo::{ error::NotFound as _, login::{Login, Provider as _}, - token::Provider as _, + token::{self, Provider as _}, }, }; pub struct Logins<'a> { db: &'a SqlitePool, + logins: &'a Broadcaster, } impl<'a> Logins<'a> { - pub const fn new(db: &'a SqlitePool) -> Self { - Self { db } + pub const fn new(db: &'a SqlitePool, logins: &'a Broadcaster) -> Self { + Self { db, logins } } pub async fn login( @@ -63,7 +69,7 @@ impl<'a> Logins<'a> { &self, secret: &IdentitySecret, used_at: &DateTime, - ) -> Result<Login, ValidateError> { + ) -> Result<(token::Id, Login), ValidateError> { let mut tx = self.db.begin().await?; let login = tx .tokens() @@ -75,26 +81,56 @@ impl<'a> Logins<'a> { Ok(login) } + pub fn limit_stream<E>( + &self, + token: token::Id, + events: impl Stream<Item = E> + std::fmt::Debug, + ) -> impl Stream<Item = E> + std::fmt::Debug + where + E: std::fmt::Debug, + { + let token_events = self + .logins + .subscribe() + .filter(move |event| future::ready(event.token == token)) + .map(|_| GuardedEvent::TokenRevoked); + + let events = events.map(|event| GuardedEvent::Event(event)); + + stream::select(token_events, events).scan((), |(), event| { + future::ready(match event { + GuardedEvent::Event(event) => Some(event), + GuardedEvent::TokenRevoked => None, + }) + }) + } + pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { // Somewhat arbitrarily, expire after 7 days. let expire_at = relative_to.to_owned() - TimeDelta::days(7); let mut tx = self.db.begin().await?; - tx.tokens().expire(&expire_at).await?; + let tokens = tx.tokens().expire(&expire_at).await?; tx.commit().await?; + for event in tokens.into_iter().map(types::TokenRevoked::from) { + self.logins.broadcast(&event); + } + Ok(()) } pub async fn logout(&self, secret: &IdentitySecret) -> Result<(), ValidateError> { let mut tx = self.db.begin().await?; - tx.tokens() + let token = tx + .tokens() .revoke(secret) .await .not_found(|| ValidateError::InvalidToken)?; - tx.commit().await?; + self.logins.broadcast(&types::TokenRevoked::from(token)); + Ok(()) } } @@ -124,3 +160,9 @@ pub enum ValidateError { #[error(transparent)] DatabaseError(#[from] sqlx::Error), } + +#[derive(Debug)] +enum GuardedEvent<E> { + TokenRevoked, + Event(E), +} |
