summaryrefslogtreecommitdiff
path: root/src/login
diff options
context:
space:
mode:
Diffstat (limited to 'src/login')
-rw-r--r--src/login/app.rs58
-rw-r--r--src/login/broadcaster.rs3
-rw-r--r--src/login/extract.rs74
-rw-r--r--src/login/mod.rs2
-rw-r--r--src/login/routes/test/login.rs4
-rw-r--r--src/login/types.rs12
6 files changed, 140 insertions, 13 deletions
diff --git a/src/login/app.rs b/src/login/app.rs
index f7fec88..b8916a8 100644
--- a/src/login/app.rs
+++ b/src/login/app.rs
@@ -1,24 +1,30 @@
use chrono::TimeDelta;
+use futures::{
+ future,
+ stream::{self, StreamExt as _},
+ Stream,
+};
use sqlx::sqlite::SqlitePool;
-use super::{extract::IdentitySecret, repo::auth::Provider as _};
+use super::{broadcaster::Broadcaster, extract::IdentitySecret, repo::auth::Provider as _, types};
use crate::{
clock::DateTime,
password::Password,
repo::{
error::NotFound as _,
login::{Login, Provider as _},
- token::Provider as _,
+ token::{self, Provider as _},
},
};
pub struct Logins<'a> {
db: &'a SqlitePool,
+ logins: &'a Broadcaster,
}
impl<'a> Logins<'a> {
- pub const fn new(db: &'a SqlitePool) -> Self {
- Self { db }
+ pub const fn new(db: &'a SqlitePool, logins: &'a Broadcaster) -> Self {
+ Self { db, logins }
}
pub async fn login(
@@ -63,7 +69,7 @@ impl<'a> Logins<'a> {
&self,
secret: &IdentitySecret,
used_at: &DateTime,
- ) -> Result<Login, ValidateError> {
+ ) -> Result<(token::Id, Login), ValidateError> {
let mut tx = self.db.begin().await?;
let login = tx
.tokens()
@@ -75,26 +81,56 @@ impl<'a> Logins<'a> {
Ok(login)
}
+ pub fn limit_stream<E>(
+ &self,
+ token: token::Id,
+ events: impl Stream<Item = E> + std::fmt::Debug,
+ ) -> impl Stream<Item = E> + std::fmt::Debug
+ where
+ E: std::fmt::Debug,
+ {
+ let token_events = self
+ .logins
+ .subscribe()
+ .filter(move |event| future::ready(event.token == token))
+ .map(|_| GuardedEvent::TokenRevoked);
+
+ let events = events.map(|event| GuardedEvent::Event(event));
+
+ stream::select(token_events, events).scan((), |(), event| {
+ future::ready(match event {
+ GuardedEvent::Event(event) => Some(event),
+ GuardedEvent::TokenRevoked => None,
+ })
+ })
+ }
+
pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> {
// Somewhat arbitrarily, expire after 7 days.
let expire_at = relative_to.to_owned() - TimeDelta::days(7);
let mut tx = self.db.begin().await?;
- tx.tokens().expire(&expire_at).await?;
+ let tokens = tx.tokens().expire(&expire_at).await?;
tx.commit().await?;
+ for event in tokens.into_iter().map(types::TokenRevoked::from) {
+ self.logins.broadcast(&event);
+ }
+
Ok(())
}
pub async fn logout(&self, secret: &IdentitySecret) -> Result<(), ValidateError> {
let mut tx = self.db.begin().await?;
- tx.tokens()
+ let token = tx
+ .tokens()
.revoke(secret)
.await
.not_found(|| ValidateError::InvalidToken)?;
-
tx.commit().await?;
+ self.logins.broadcast(&types::TokenRevoked::from(token));
+
Ok(())
}
}
@@ -124,3 +160,9 @@ pub enum ValidateError {
#[error(transparent)]
DatabaseError(#[from] sqlx::Error),
}
+
+#[derive(Debug)]
+enum GuardedEvent<E> {
+ TokenRevoked,
+ Event(E),
+}
diff --git a/src/login/broadcaster.rs b/src/login/broadcaster.rs
new file mode 100644
index 0000000..8e1fb3a
--- /dev/null
+++ b/src/login/broadcaster.rs
@@ -0,0 +1,3 @@
+use crate::{broadcast, login::types};
+
+pub type Broadcaster = broadcast::Broadcaster<types::TokenRevoked>;
diff --git a/src/login/extract.rs b/src/login/extract.rs
index 3b31d4c..b585565 100644
--- a/src/login/extract.rs
+++ b/src/login/extract.rs
@@ -1,12 +1,20 @@
use std::fmt;
use axum::{
- extract::FromRequestParts,
- http::request::Parts,
- response::{IntoResponseParts, ResponseParts},
+ extract::{FromRequestParts, State},
+ http::{request::Parts, StatusCode},
+ response::{IntoResponse, IntoResponseParts, Response, ResponseParts},
};
use axum_extra::extract::cookie::{Cookie, CookieJar};
+use crate::{
+ app::App,
+ clock::RequestedAt,
+ error::Internal,
+ login::app::ValidateError,
+ repo::{login::Login, token},
+};
+
// The usage pattern here - receive the extractor as an argument, return it in
// the response - is heavily modelled after CookieJar's own intended usage.
#[derive(Clone)]
@@ -112,3 +120,63 @@ where
Self(value.into())
}
}
+
+#[derive(Clone, Debug)]
+pub struct Identity {
+ pub token: token::Id,
+ pub login: Login,
+}
+
+#[async_trait::async_trait]
+impl FromRequestParts<App> for Identity {
+ type Rejection = LoginError<Internal>;
+
+ async fn from_request_parts(parts: &mut Parts, state: &App) -> Result<Self, Self::Rejection> {
+ // After Rust 1.82 (and #[feature(min_exhaustive_patterns)] lands on
+ // stable), the following can be replaced:
+ //
+ // ```
+ // let Ok(identity_token) = IdentityToken::from_request_parts(
+ // parts,
+ // state,
+ // ).await;
+ // ```
+ let identity_token = IdentityToken::from_request_parts(parts, state).await?;
+ let RequestedAt(used_at) = RequestedAt::from_request_parts(parts, state).await?;
+
+ let secret = identity_token.secret().ok_or(LoginError::Unauthorized)?;
+
+ let app = State::<App>::from_request_parts(parts, state).await?;
+ match app.logins().validate(&secret, &used_at).await {
+ Ok((token, login)) => Ok(Identity { token, login }),
+ Err(ValidateError::InvalidToken) => Err(LoginError::Unauthorized),
+ Err(other) => Err(other.into()),
+ }
+ }
+}
+
+pub enum LoginError<E> {
+ Failure(E),
+ Unauthorized,
+}
+
+impl<E> IntoResponse for LoginError<E>
+where
+ E: IntoResponse,
+{
+ fn into_response(self) -> Response {
+ match self {
+ Self::Unauthorized => (StatusCode::UNAUTHORIZED, "unauthorized").into_response(),
+ Self::Failure(e) => e.into_response(),
+ }
+ }
+}
+
+impl<E> From<E> for LoginError<Internal>
+where
+ E: Into<Internal>,
+{
+ fn from(err: E) -> Self {
+ Self::Failure(err.into())
+ }
+}
diff --git a/src/login/mod.rs b/src/login/mod.rs
index 191cce0..6ae82ac 100644
--- a/src/login/mod.rs
+++ b/src/login/mod.rs
@@ -1,6 +1,8 @@
pub use self::routes::router;
pub mod app;
+pub mod broadcaster;
pub mod extract;
mod repo;
mod routes;
+pub mod types;
diff --git a/src/login/routes/test/login.rs b/src/login/routes/test/login.rs
index 10c17d6..81653ff 100644
--- a/src/login/routes/test/login.rs
+++ b/src/login/routes/test/login.rs
@@ -36,7 +36,7 @@ async fn new_identity() {
// Verify the semantics
let validated_at = fixtures::now();
- let validated = app
+ let (_, validated) = app
.logins()
.validate(&secret, &validated_at)
.await
@@ -73,7 +73,7 @@ async fn existing_identity() {
// Verify the semantics
let validated_at = fixtures::now();
- let validated_login = app
+ let (_, validated_login) = app
.logins()
.validate(&secret, &validated_at)
.await
diff --git a/src/login/types.rs b/src/login/types.rs
new file mode 100644
index 0000000..7c7cbf9
--- /dev/null
+++ b/src/login/types.rs
@@ -0,0 +1,12 @@
+use crate::repo::token;
+
+#[derive(Clone, Debug)]
+pub struct TokenRevoked {
+ pub token: token::Id,
+}
+
+impl From<token::Id> for TokenRevoked {
+ fn from(token: token::Id) -> Self {
+ Self { token }
+ }
+}