summaryrefslogtreecommitdiff
path: root/src/login/app.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/login/app.rs')
-rw-r--r--src/login/app.rs58
1 files changed, 50 insertions, 8 deletions
diff --git a/src/login/app.rs b/src/login/app.rs
index f7fec88..b8916a8 100644
--- a/src/login/app.rs
+++ b/src/login/app.rs
@@ -1,24 +1,30 @@
use chrono::TimeDelta;
+use futures::{
+ future,
+ stream::{self, StreamExt as _},
+ Stream,
+};
use sqlx::sqlite::SqlitePool;
-use super::{extract::IdentitySecret, repo::auth::Provider as _};
+use super::{broadcaster::Broadcaster, extract::IdentitySecret, repo::auth::Provider as _, types};
use crate::{
clock::DateTime,
password::Password,
repo::{
error::NotFound as _,
login::{Login, Provider as _},
- token::Provider as _,
+ token::{self, Provider as _},
},
};
pub struct Logins<'a> {
db: &'a SqlitePool,
+ logins: &'a Broadcaster,
}
impl<'a> Logins<'a> {
- pub const fn new(db: &'a SqlitePool) -> Self {
- Self { db }
+ pub const fn new(db: &'a SqlitePool, logins: &'a Broadcaster) -> Self {
+ Self { db, logins }
}
pub async fn login(
@@ -63,7 +69,7 @@ impl<'a> Logins<'a> {
&self,
secret: &IdentitySecret,
used_at: &DateTime,
- ) -> Result<Login, ValidateError> {
+ ) -> Result<(token::Id, Login), ValidateError> {
let mut tx = self.db.begin().await?;
let login = tx
.tokens()
@@ -75,26 +81,56 @@ impl<'a> Logins<'a> {
Ok(login)
}
+ pub fn limit_stream<E>(
+ &self,
+ token: token::Id,
+ events: impl Stream<Item = E> + std::fmt::Debug,
+ ) -> impl Stream<Item = E> + std::fmt::Debug
+ where
+ E: std::fmt::Debug,
+ {
+ let token_events = self
+ .logins
+ .subscribe()
+ .filter(move |event| future::ready(event.token == token))
+ .map(|_| GuardedEvent::TokenRevoked);
+
+ let events = events.map(|event| GuardedEvent::Event(event));
+
+ stream::select(token_events, events).scan((), |(), event| {
+ future::ready(match event {
+ GuardedEvent::Event(event) => Some(event),
+ GuardedEvent::TokenRevoked => None,
+ })
+ })
+ }
+
pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> {
// Somewhat arbitrarily, expire after 7 days.
let expire_at = relative_to.to_owned() - TimeDelta::days(7);
let mut tx = self.db.begin().await?;
- tx.tokens().expire(&expire_at).await?;
+ let tokens = tx.tokens().expire(&expire_at).await?;
tx.commit().await?;
+ for event in tokens.into_iter().map(types::TokenRevoked::from) {
+ self.logins.broadcast(&event);
+ }
+
Ok(())
}
pub async fn logout(&self, secret: &IdentitySecret) -> Result<(), ValidateError> {
let mut tx = self.db.begin().await?;
- tx.tokens()
+ let token = tx
+ .tokens()
.revoke(secret)
.await
.not_found(|| ValidateError::InvalidToken)?;
-
tx.commit().await?;
+ self.logins.broadcast(&types::TokenRevoked::from(token));
+
Ok(())
}
}
@@ -124,3 +160,9 @@ pub enum ValidateError {
#[error(transparent)]
DatabaseError(#[from] sqlx::Error),
}
+
+#[derive(Debug)]
+enum GuardedEvent<E> {
+ TokenRevoked,
+ Event(E),
+}