summaryrefslogtreecommitdiff
path: root/src/token/app.rs
diff options
context:
space:
mode:
authorOwen Jacobson <owen@grimoire.ca>2024-10-02 01:02:58 -0400
committerOwen Jacobson <owen@grimoire.ca>2024-10-02 01:02:58 -0400
commit5d3392799f88c5a3d3f9c656c73d6e8ac5c4d793 (patch)
tree426c568d82b67a98095d25952d2b5b2345a6545b /src/token/app.rs
parent357116366c1307bedaac6a3dfe9c5ed8e0e0c210 (diff)
Split login and token handling.
Diffstat (limited to 'src/token/app.rs')
-rw-r--r--src/token/app.rs170
1 files changed, 170 insertions, 0 deletions
diff --git a/src/token/app.rs b/src/token/app.rs
new file mode 100644
index 0000000..1477a9f
--- /dev/null
+++ b/src/token/app.rs
@@ -0,0 +1,170 @@
+use chrono::TimeDelta;
+use futures::{
+ future,
+ stream::{self, StreamExt as _},
+ Stream,
+};
+use sqlx::sqlite::SqlitePool;
+
+use super::{
+ broadcaster::Broadcaster, event, repo::auth::Provider as _, repo::Provider as _, Id, Secret,
+};
+use crate::{
+ clock::DateTime,
+ login::{repo::Provider as _, Login, Password},
+ repo::error::NotFound as _,
+};
+
+pub struct Tokens<'a> {
+ db: &'a SqlitePool,
+ tokens: &'a Broadcaster,
+}
+
+impl<'a> Tokens<'a> {
+ pub const fn new(db: &'a SqlitePool, tokens: &'a Broadcaster) -> Self {
+ Self { db, tokens }
+ }
+ pub async fn login(
+ &self,
+ name: &str,
+ password: &Password,
+ login_at: &DateTime,
+ ) -> Result<Secret, LoginError> {
+ let mut tx = self.db.begin().await?;
+
+ let login = if let Some((login, stored_hash)) = tx.auth().for_name(name).await? {
+ if stored_hash.verify(password)? {
+ // Password verified; use the login.
+ login
+ } else {
+ // Password NOT verified.
+ return Err(LoginError::Rejected);
+ }
+ } else {
+ let password_hash = password.hash()?;
+ tx.logins().create(name, &password_hash).await?
+ };
+
+ let token = tx.tokens().issue(&login, login_at).await?;
+ tx.commit().await?;
+
+ Ok(token)
+ }
+
+ pub async fn validate(
+ &self,
+ secret: &Secret,
+ used_at: &DateTime,
+ ) -> Result<(Id, Login), ValidateError> {
+ let mut tx = self.db.begin().await?;
+ let login = tx
+ .tokens()
+ .validate(secret, used_at)
+ .await
+ .not_found(|| ValidateError::InvalidToken)?;
+ tx.commit().await?;
+
+ Ok(login)
+ }
+
+ pub async fn limit_stream<E>(
+ &self,
+ token: Id,
+ events: impl Stream<Item = E> + std::fmt::Debug,
+ ) -> Result<impl Stream<Item = E> + std::fmt::Debug, ValidateError>
+ where
+ E: std::fmt::Debug,
+ {
+ // Subscribe, first.
+ let token_events = self.tokens.subscribe();
+
+ // Check that the token is valid at this point in time, second. If it is, then
+ // any future revocations will appear in the subscription. If not, bail now.
+ //
+ // It's possible, otherwise, to get to this point with a token that _was_ valid
+ // at the start of the request, but which was invalided _before_ the
+ // `subscribe()` call. In that case, the corresponding revocation event will
+ // simply be missed, since the `token_events` stream subscribed after the fact.
+ // This check cancels guarding the stream here.
+ //
+ // Yes, this is a weird niche edge case. Most things don't double-check, because
+ // they aren't expected to run long enough for the token's revocation to
+ // matter. Supervising a stream, on the other hand, will run for a
+ // _long_ time; if we miss the race here, we'll never actually carry out the
+ // supervision.
+ let mut tx = self.db.begin().await?;
+ tx.tokens()
+ .require(&token)
+ .await
+ .not_found(|| ValidateError::InvalidToken)?;
+ tx.commit().await?;
+
+ // Then construct the guarded stream. First, project both streams into
+ // `GuardedEvent`.
+ let token_events = token_events
+ .filter(move |event| future::ready(event.token == token))
+ .map(|_| GuardedEvent::TokenRevoked);
+ let events = events.map(|event| GuardedEvent::Event(event));
+
+ // Merge the two streams, then unproject them, stopping at
+ // `GuardedEvent::TokenRevoked`.
+ let stream = stream::select(token_events, events).scan((), |(), event| {
+ future::ready(match event {
+ GuardedEvent::Event(event) => Some(event),
+ GuardedEvent::TokenRevoked => None,
+ })
+ });
+
+ Ok(stream)
+ }
+
+ pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> {
+ // Somewhat arbitrarily, expire after 7 days.
+ let expire_at = relative_to.to_owned() - TimeDelta::days(7);
+
+ let mut tx = self.db.begin().await?;
+ let tokens = tx.tokens().expire(&expire_at).await?;
+ tx.commit().await?;
+
+ for event in tokens.into_iter().map(event::TokenRevoked::from) {
+ self.tokens.broadcast(&event);
+ }
+
+ Ok(())
+ }
+
+ pub async fn logout(&self, token: &Id) -> Result<(), ValidateError> {
+ let mut tx = self.db.begin().await?;
+ tx.tokens().revoke(token).await?;
+ tx.commit().await?;
+
+ self.tokens
+ .broadcast(&event::TokenRevoked::from(token.clone()));
+
+ Ok(())
+ }
+}
+
+#[derive(Debug, thiserror::Error)]
+pub enum LoginError {
+ #[error("invalid login")]
+ Rejected,
+ #[error(transparent)]
+ DatabaseError(#[from] sqlx::Error),
+ #[error(transparent)]
+ PasswordHashError(#[from] password_hash::Error),
+}
+
+#[derive(Debug, thiserror::Error)]
+pub enum ValidateError {
+ #[error("invalid token")]
+ InvalidToken,
+ #[error(transparent)]
+ DatabaseError(#[from] sqlx::Error),
+}
+
+#[derive(Debug)]
+enum GuardedEvent<E> {
+ TokenRevoked,
+ Event(E),
+}