diff options
| author | Owen Jacobson <owen@grimoire.ca> | 2024-10-02 01:02:58 -0400 |
|---|---|---|
| committer | Owen Jacobson <owen@grimoire.ca> | 2024-10-02 01:02:58 -0400 |
| commit | 5d3392799f88c5a3d3f9c656c73d6e8ac5c4d793 (patch) | |
| tree | 426c568d82b67a98095d25952d2b5b2345a6545b /src/token/app.rs | |
| parent | 357116366c1307bedaac6a3dfe9c5ed8e0e0c210 (diff) | |
Split login and token handling.
Diffstat (limited to 'src/token/app.rs')
| -rw-r--r-- | src/token/app.rs | 170 |
1 files changed, 170 insertions, 0 deletions
diff --git a/src/token/app.rs b/src/token/app.rs new file mode 100644 index 0000000..1477a9f --- /dev/null +++ b/src/token/app.rs @@ -0,0 +1,170 @@ +use chrono::TimeDelta; +use futures::{ + future, + stream::{self, StreamExt as _}, + Stream, +}; +use sqlx::sqlite::SqlitePool; + +use super::{ + broadcaster::Broadcaster, event, repo::auth::Provider as _, repo::Provider as _, Id, Secret, +}; +use crate::{ + clock::DateTime, + login::{repo::Provider as _, Login, Password}, + repo::error::NotFound as _, +}; + +pub struct Tokens<'a> { + db: &'a SqlitePool, + tokens: &'a Broadcaster, +} + +impl<'a> Tokens<'a> { + pub const fn new(db: &'a SqlitePool, tokens: &'a Broadcaster) -> Self { + Self { db, tokens } + } + pub async fn login( + &self, + name: &str, + password: &Password, + login_at: &DateTime, + ) -> Result<Secret, LoginError> { + let mut tx = self.db.begin().await?; + + let login = if let Some((login, stored_hash)) = tx.auth().for_name(name).await? { + if stored_hash.verify(password)? { + // Password verified; use the login. + login + } else { + // Password NOT verified. + return Err(LoginError::Rejected); + } + } else { + let password_hash = password.hash()?; + tx.logins().create(name, &password_hash).await? + }; + + let token = tx.tokens().issue(&login, login_at).await?; + tx.commit().await?; + + Ok(token) + } + + pub async fn validate( + &self, + secret: &Secret, + used_at: &DateTime, + ) -> Result<(Id, Login), ValidateError> { + let mut tx = self.db.begin().await?; + let login = tx + .tokens() + .validate(secret, used_at) + .await + .not_found(|| ValidateError::InvalidToken)?; + tx.commit().await?; + + Ok(login) + } + + pub async fn limit_stream<E>( + &self, + token: Id, + events: impl Stream<Item = E> + std::fmt::Debug, + ) -> Result<impl Stream<Item = E> + std::fmt::Debug, ValidateError> + where + E: std::fmt::Debug, + { + // Subscribe, first. + let token_events = self.tokens.subscribe(); + + // Check that the token is valid at this point in time, second. If it is, then + // any future revocations will appear in the subscription. If not, bail now. + // + // It's possible, otherwise, to get to this point with a token that _was_ valid + // at the start of the request, but which was invalided _before_ the + // `subscribe()` call. In that case, the corresponding revocation event will + // simply be missed, since the `token_events` stream subscribed after the fact. + // This check cancels guarding the stream here. + // + // Yes, this is a weird niche edge case. Most things don't double-check, because + // they aren't expected to run long enough for the token's revocation to + // matter. Supervising a stream, on the other hand, will run for a + // _long_ time; if we miss the race here, we'll never actually carry out the + // supervision. + let mut tx = self.db.begin().await?; + tx.tokens() + .require(&token) + .await + .not_found(|| ValidateError::InvalidToken)?; + tx.commit().await?; + + // Then construct the guarded stream. First, project both streams into + // `GuardedEvent`. + let token_events = token_events + .filter(move |event| future::ready(event.token == token)) + .map(|_| GuardedEvent::TokenRevoked); + let events = events.map(|event| GuardedEvent::Event(event)); + + // Merge the two streams, then unproject them, stopping at + // `GuardedEvent::TokenRevoked`. + let stream = stream::select(token_events, events).scan((), |(), event| { + future::ready(match event { + GuardedEvent::Event(event) => Some(event), + GuardedEvent::TokenRevoked => None, + }) + }); + + Ok(stream) + } + + pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { + // Somewhat arbitrarily, expire after 7 days. + let expire_at = relative_to.to_owned() - TimeDelta::days(7); + + let mut tx = self.db.begin().await?; + let tokens = tx.tokens().expire(&expire_at).await?; + tx.commit().await?; + + for event in tokens.into_iter().map(event::TokenRevoked::from) { + self.tokens.broadcast(&event); + } + + Ok(()) + } + + pub async fn logout(&self, token: &Id) -> Result<(), ValidateError> { + let mut tx = self.db.begin().await?; + tx.tokens().revoke(token).await?; + tx.commit().await?; + + self.tokens + .broadcast(&event::TokenRevoked::from(token.clone())); + + Ok(()) + } +} + +#[derive(Debug, thiserror::Error)] +pub enum LoginError { + #[error("invalid login")] + Rejected, + #[error(transparent)] + DatabaseError(#[from] sqlx::Error), + #[error(transparent)] + PasswordHashError(#[from] password_hash::Error), +} + +#[derive(Debug, thiserror::Error)] +pub enum ValidateError { + #[error("invalid token")] + InvalidToken, + #[error(transparent)] + DatabaseError(#[from] sqlx::Error), +} + +#[derive(Debug)] +enum GuardedEvent<E> { + TokenRevoked, + Event(E), +} |
