summaryrefslogtreecommitdiff
path: root/src/token/app.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/token/app.rs')
-rw-r--r--src/token/app.rs49
1 files changed, 35 insertions, 14 deletions
diff --git a/src/token/app.rs b/src/token/app.rs
index 5c4fcd5..b8af637 100644
--- a/src/token/app.rs
+++ b/src/token/app.rs
@@ -7,23 +7,34 @@ use futures::{
use sqlx::sqlite::SqlitePool;
use super::{
- broadcaster::Broadcaster, event, repo::auth::Provider as _, repo::Provider as _, Id, Secret,
+ repo::auth::Provider as _, repo::Provider as _, Broadcaster, Event as TokenEvent, Id, Secret,
};
use crate::{
clock::DateTime,
db::NotFound as _,
+ event::{self, repo::Provider as _, Event as ServiceEvent},
login::{repo::Provider as _, Login, Password},
};
pub struct Tokens<'a> {
db: &'a SqlitePool,
- tokens: &'a Broadcaster,
+ events: &'a event::Broadcaster,
+ token_events: &'a Broadcaster,
}
impl<'a> Tokens<'a> {
- pub const fn new(db: &'a SqlitePool, tokens: &'a Broadcaster) -> Self {
- Self { db, tokens }
+ pub const fn new(
+ db: &'a SqlitePool,
+ events: &'a event::Broadcaster,
+ token_events: &'a Broadcaster,
+ ) -> Self {
+ Self {
+ db,
+ events,
+ token_events,
+ }
}
+
pub async fn login(
&self,
name: &str,
@@ -32,22 +43,30 @@ impl<'a> Tokens<'a> {
) -> Result<Secret, LoginError> {
let mut tx = self.db.begin().await?;
- let login = if let Some((login, stored_hash)) = tx.auth().for_name(name).await? {
+ let (login, created) = if let Some((login, stored_hash)) = tx.auth().for_name(name).await? {
if stored_hash.verify(password)? {
- // Password verified; use the login.
- login
+ // Password verified, proceed with login
+ (login, false)
} else {
// Password NOT verified.
return Err(LoginError::Rejected);
}
} else {
let password_hash = password.hash()?;
- tx.logins().create(name, &password_hash).await?
+ let created = tx.sequence().next(login_at).await?;
+ let login = tx.logins().create(name, &password_hash, &created).await?;
+
+ (login, true)
};
let token = tx.tokens().issue(&login, login_at).await?;
tx.commit().await?;
+ if created {
+ self.events
+ .broadcast(login.events().map(ServiceEvent::from).collect::<Vec<_>>());
+ }
+
Ok(token)
}
@@ -76,7 +95,7 @@ impl<'a> Tokens<'a> {
E: std::fmt::Debug,
{
// Subscribe, first.
- let token_events = self.tokens.subscribe();
+ let token_events = self.token_events.subscribe();
// Check that the token is valid at this point in time, second. If it is, then
// any future revocations will appear in the subscription. If not, bail now.
@@ -102,7 +121,9 @@ impl<'a> Tokens<'a> {
// Then construct the guarded stream. First, project both streams into
// `GuardedEvent`.
let token_events = token_events
- .filter(move |event| future::ready(event.token == token))
+ .filter(move |event| {
+ future::ready(matches!(event, TokenEvent::Revoked(id) if id == &token))
+ })
.map(|_| GuardedEvent::TokenRevoked);
let events = events.map(|event| GuardedEvent::Event(event));
@@ -126,8 +147,8 @@ impl<'a> Tokens<'a> {
let tokens = tx.tokens().expire(&expire_at).await?;
tx.commit().await?;
- for event in tokens.into_iter().map(event::TokenRevoked::from) {
- self.tokens.broadcast(event);
+ for event in tokens.into_iter().map(TokenEvent::Revoked) {
+ self.token_events.broadcast(event);
}
Ok(())
@@ -138,8 +159,8 @@ impl<'a> Tokens<'a> {
tx.tokens().revoke(token).await?;
tx.commit().await?;
- self.tokens
- .broadcast(event::TokenRevoked::from(token.clone()));
+ self.token_events
+ .broadcast(TokenEvent::Revoked(token.clone()));
Ok(())
}