summaryrefslogtreecommitdiff
path: root/src/login
diff options
context:
space:
mode:
authorOwen Jacobson <owen@grimoire.ca>2024-09-28 21:55:20 -0400
committerOwen Jacobson <owen@grimoire.ca>2024-09-29 01:19:19 -0400
commit0b1cb80dd0b0f90c4892de7e7a2d18a076ecbdf2 (patch)
treeb41313dbd92811ffcc87b0af576dc570b5802a1e /src/login
parent4d0bb0709b168a24ab6a8dbc86da45d7503596ee (diff)
Shut down the `/api/events` stream when the user logs out or their token expires.
When tokens are revoked (logout or expiry), the server now publishes an internal event via the new `logins` event broadcaster. These events are used to guard the `/api/events` stream. When a token revocation event arrives for the token used to subscribe to the stream, the stream is cut short, disconnecting the client. In service of this, tokens now have IDs, which are non-confidential values that can be used to discuss tokens without their secrets being passed around unnecessarily. These IDs are not (at this time) exposed to clients, but they could be.
Diffstat (limited to 'src/login')
-rw-r--r--src/login/app.rs58
-rw-r--r--src/login/broadcaster.rs3
-rw-r--r--src/login/extract.rs74
-rw-r--r--src/login/mod.rs2
-rw-r--r--src/login/routes/test/login.rs4
-rw-r--r--src/login/types.rs12
6 files changed, 140 insertions, 13 deletions
diff --git a/src/login/app.rs b/src/login/app.rs
index f7fec88..b8916a8 100644
--- a/src/login/app.rs
+++ b/src/login/app.rs
@@ -1,24 +1,30 @@
use chrono::TimeDelta;
+use futures::{
+ future,
+ stream::{self, StreamExt as _},
+ Stream,
+};
use sqlx::sqlite::SqlitePool;
-use super::{extract::IdentitySecret, repo::auth::Provider as _};
+use super::{broadcaster::Broadcaster, extract::IdentitySecret, repo::auth::Provider as _, types};
use crate::{
clock::DateTime,
password::Password,
repo::{
error::NotFound as _,
login::{Login, Provider as _},
- token::Provider as _,
+ token::{self, Provider as _},
},
};
pub struct Logins<'a> {
db: &'a SqlitePool,
+ logins: &'a Broadcaster,
}
impl<'a> Logins<'a> {
- pub const fn new(db: &'a SqlitePool) -> Self {
- Self { db }
+ pub const fn new(db: &'a SqlitePool, logins: &'a Broadcaster) -> Self {
+ Self { db, logins }
}
pub async fn login(
@@ -63,7 +69,7 @@ impl<'a> Logins<'a> {
&self,
secret: &IdentitySecret,
used_at: &DateTime,
- ) -> Result<Login, ValidateError> {
+ ) -> Result<(token::Id, Login), ValidateError> {
let mut tx = self.db.begin().await?;
let login = tx
.tokens()
@@ -75,26 +81,56 @@ impl<'a> Logins<'a> {
Ok(login)
}
+ pub fn limit_stream<E>(
+ &self,
+ token: token::Id,
+ events: impl Stream<Item = E> + std::fmt::Debug,
+ ) -> impl Stream<Item = E> + std::fmt::Debug
+ where
+ E: std::fmt::Debug,
+ {
+ let token_events = self
+ .logins
+ .subscribe()
+ .filter(move |event| future::ready(event.token == token))
+ .map(|_| GuardedEvent::TokenRevoked);
+
+ let events = events.map(|event| GuardedEvent::Event(event));
+
+ stream::select(token_events, events).scan((), |(), event| {
+ future::ready(match event {
+ GuardedEvent::Event(event) => Some(event),
+ GuardedEvent::TokenRevoked => None,
+ })
+ })
+ }
+
pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> {
// Somewhat arbitrarily, expire after 7 days.
let expire_at = relative_to.to_owned() - TimeDelta::days(7);
let mut tx = self.db.begin().await?;
- tx.tokens().expire(&expire_at).await?;
+ let tokens = tx.tokens().expire(&expire_at).await?;
tx.commit().await?;
+ for event in tokens.into_iter().map(types::TokenRevoked::from) {
+ self.logins.broadcast(&event);
+ }
+
Ok(())
}
pub async fn logout(&self, secret: &IdentitySecret) -> Result<(), ValidateError> {
let mut tx = self.db.begin().await?;
- tx.tokens()
+ let token = tx
+ .tokens()
.revoke(secret)
.await
.not_found(|| ValidateError::InvalidToken)?;
-
tx.commit().await?;
+ self.logins.broadcast(&types::TokenRevoked::from(token));
+
Ok(())
}
}
@@ -124,3 +160,9 @@ pub enum ValidateError {
#[error(transparent)]
DatabaseError(#[from] sqlx::Error),
}
+
+#[derive(Debug)]
+enum GuardedEvent<E> {
+ TokenRevoked,
+ Event(E),
+}
diff --git a/src/login/broadcaster.rs b/src/login/broadcaster.rs
new file mode 100644
index 0000000..8e1fb3a
--- /dev/null
+++ b/src/login/broadcaster.rs
@@ -0,0 +1,3 @@
+use crate::{broadcast, login::types};
+
+pub type Broadcaster = broadcast::Broadcaster<types::TokenRevoked>;
diff --git a/src/login/extract.rs b/src/login/extract.rs
index 3b31d4c..b585565 100644
--- a/src/login/extract.rs
+++ b/src/login/extract.rs
@@ -1,12 +1,20 @@
use std::fmt;
use axum::{
- extract::FromRequestParts,
- http::request::Parts,
- response::{IntoResponseParts, ResponseParts},
+ extract::{FromRequestParts, State},
+ http::{request::Parts, StatusCode},
+ response::{IntoResponse, IntoResponseParts, Response, ResponseParts},
};
use axum_extra::extract::cookie::{Cookie, CookieJar};
+use crate::{
+ app::App,
+ clock::RequestedAt,
+ error::Internal,
+ login::app::ValidateError,
+ repo::{login::Login, token},
+};
+
// The usage pattern here - receive the extractor as an argument, return it in
// the response - is heavily modelled after CookieJar's own intended usage.
#[derive(Clone)]
@@ -112,3 +120,63 @@ where
Self(value.into())
}
}
+
+#[derive(Clone, Debug)]
+pub struct Identity {
+ pub token: token::Id,
+ pub login: Login,
+}
+
+#[async_trait::async_trait]
+impl FromRequestParts<App> for Identity {
+ type Rejection = LoginError<Internal>;
+
+ async fn from_request_parts(parts: &mut Parts, state: &App) -> Result<Self, Self::Rejection> {
+ // After Rust 1.82 (and #[feature(min_exhaustive_patterns)] lands on
+ // stable), the following can be replaced:
+ //
+ // ```
+ // let Ok(identity_token) = IdentityToken::from_request_parts(
+ // parts,
+ // state,
+ // ).await;
+ // ```
+ let identity_token = IdentityToken::from_request_parts(parts, state).await?;
+ let RequestedAt(used_at) = RequestedAt::from_request_parts(parts, state).await?;
+
+ let secret = identity_token.secret().ok_or(LoginError::Unauthorized)?;
+
+ let app = State::<App>::from_request_parts(parts, state).await?;
+ match app.logins().validate(&secret, &used_at).await {
+ Ok((token, login)) => Ok(Identity { token, login }),
+ Err(ValidateError::InvalidToken) => Err(LoginError::Unauthorized),
+ Err(other) => Err(other.into()),
+ }
+ }
+}
+
+pub enum LoginError<E> {
+ Failure(E),
+ Unauthorized,
+}
+
+impl<E> IntoResponse for LoginError<E>
+where
+ E: IntoResponse,
+{
+ fn into_response(self) -> Response {
+ match self {
+ Self::Unauthorized => (StatusCode::UNAUTHORIZED, "unauthorized").into_response(),
+ Self::Failure(e) => e.into_response(),
+ }
+ }
+}
+
+impl<E> From<E> for LoginError<Internal>
+where
+ E: Into<Internal>,
+{
+ fn from(err: E) -> Self {
+ Self::Failure(err.into())
+ }
+}
diff --git a/src/login/mod.rs b/src/login/mod.rs
index 191cce0..6ae82ac 100644
--- a/src/login/mod.rs
+++ b/src/login/mod.rs
@@ -1,6 +1,8 @@
pub use self::routes::router;
pub mod app;
+pub mod broadcaster;
pub mod extract;
mod repo;
mod routes;
+pub mod types;
diff --git a/src/login/routes/test/login.rs b/src/login/routes/test/login.rs
index 10c17d6..81653ff 100644
--- a/src/login/routes/test/login.rs
+++ b/src/login/routes/test/login.rs
@@ -36,7 +36,7 @@ async fn new_identity() {
// Verify the semantics
let validated_at = fixtures::now();
- let validated = app
+ let (_, validated) = app
.logins()
.validate(&secret, &validated_at)
.await
@@ -73,7 +73,7 @@ async fn existing_identity() {
// Verify the semantics
let validated_at = fixtures::now();
- let validated_login = app
+ let (_, validated_login) = app
.logins()
.validate(&secret, &validated_at)
.await
diff --git a/src/login/types.rs b/src/login/types.rs
new file mode 100644
index 0000000..7c7cbf9
--- /dev/null
+++ b/src/login/types.rs
@@ -0,0 +1,12 @@
+use crate::repo::token;
+
+#[derive(Clone, Debug)]
+pub struct TokenRevoked {
+ pub token: token::Id,
+}
+
+impl From<token::Id> for TokenRevoked {
+ fn from(token: token::Id) -> Self {
+ Self { token }
+ }
+}