summaryrefslogtreecommitdiff
path: root/src/login/app.rs
diff options
context:
space:
mode:
authorOwen Jacobson <owen@grimoire.ca>2024-10-01 20:32:57 -0400
committerOwen Jacobson <owen@grimoire.ca>2024-10-01 20:32:57 -0400
commit7645411bcf7201e3a4927566da78080dc6a84ccf (patch)
tree2711922bfeab6dc8b6494e9b0976f3f051dff4a9 /src/login/app.rs
parent6c054c5b8d43a818ccfa9087960dc19b286e6bb7 (diff)
Prevent racing between `limit_stream` and logging out.
Diffstat (limited to 'src/login/app.rs')
-rw-r--r--src/login/app.rs43
1 files changed, 35 insertions, 8 deletions
diff --git a/src/login/app.rs b/src/login/app.rs
index 182c62c..95f0a07 100644
--- a/src/login/app.rs
+++ b/src/login/app.rs
@@ -81,28 +81,55 @@ impl<'a> Logins<'a> {
Ok(login)
}
- pub fn limit_stream<E>(
+ pub async fn limit_stream<E>(
&self,
token: token::Id,
events: impl Stream<Item = E> + std::fmt::Debug,
- ) -> impl Stream<Item = E> + std::fmt::Debug
+ ) -> Result<impl Stream<Item = E> + std::fmt::Debug, ValidateError>
where
E: std::fmt::Debug,
{
- let token_events = self
- .logins
- .subscribe()
+ // Subscribe, first.
+ let token_events = self.logins.subscribe();
+
+ // Check that the token is valid at this point in time, second. If it is, then
+ // any future revocations will appear in the subscription. If not, bail now.
+ //
+ // It's possible, otherwise, to get to this point with a token that _was_ valid
+ // at the start of the request, but which was invalided _before_ the
+ // `subscribe()` call. In that case, the corresponding revocation event will
+ // simply be missed, since the `token_events` stream subscribed after the fact.
+ // This check cancels guarding the stream here.
+ //
+ // Yes, this is a weird niche edge case. Most things don't double-check, because
+ // they aren't expected to run long enough for the token's revocation to
+ // matter. Supervising a stream, on the other hand, will run for a
+ // _long_ time; if we miss the race here, we'll never actually carry out the
+ // supervision.
+ let mut tx = self.db.begin().await?;
+ tx.tokens()
+ .require(&token)
+ .await
+ .not_found(|| ValidateError::InvalidToken)?;
+ tx.commit().await?;
+
+ // Then construct the guarded stream. First, project both streams into
+ // `GuardedEvent`.
+ let token_events = token_events
.filter(move |event| future::ready(event.token == token))
.map(|_| GuardedEvent::TokenRevoked);
-
let events = events.map(|event| GuardedEvent::Event(event));
- stream::select(token_events, events).scan((), |(), event| {
+ // Merge the two streams, then unproject them, stopping at
+ // `GuardedEvent::TokenRevoked`.
+ let stream = stream::select(token_events, events).scan((), |(), event| {
future::ready(match event {
GuardedEvent::Event(event) => Some(event),
GuardedEvent::TokenRevoked => None,
})
- })
+ });
+
+ Ok(stream)
}
pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> {