diff options
Diffstat (limited to 'src/login/app.rs')
| -rw-r--r-- | src/login/app.rs | 43 |
1 files changed, 35 insertions, 8 deletions
diff --git a/src/login/app.rs b/src/login/app.rs index 182c62c..95f0a07 100644 --- a/src/login/app.rs +++ b/src/login/app.rs @@ -81,28 +81,55 @@ impl<'a> Logins<'a> { Ok(login) } - pub fn limit_stream<E>( + pub async fn limit_stream<E>( &self, token: token::Id, events: impl Stream<Item = E> + std::fmt::Debug, - ) -> impl Stream<Item = E> + std::fmt::Debug + ) -> Result<impl Stream<Item = E> + std::fmt::Debug, ValidateError> where E: std::fmt::Debug, { - let token_events = self - .logins - .subscribe() + // Subscribe, first. + let token_events = self.logins.subscribe(); + + // Check that the token is valid at this point in time, second. If it is, then + // any future revocations will appear in the subscription. If not, bail now. + // + // It's possible, otherwise, to get to this point with a token that _was_ valid + // at the start of the request, but which was invalided _before_ the + // `subscribe()` call. In that case, the corresponding revocation event will + // simply be missed, since the `token_events` stream subscribed after the fact. + // This check cancels guarding the stream here. + // + // Yes, this is a weird niche edge case. Most things don't double-check, because + // they aren't expected to run long enough for the token's revocation to + // matter. Supervising a stream, on the other hand, will run for a + // _long_ time; if we miss the race here, we'll never actually carry out the + // supervision. + let mut tx = self.db.begin().await?; + tx.tokens() + .require(&token) + .await + .not_found(|| ValidateError::InvalidToken)?; + tx.commit().await?; + + // Then construct the guarded stream. First, project both streams into + // `GuardedEvent`. + let token_events = token_events .filter(move |event| future::ready(event.token == token)) .map(|_| GuardedEvent::TokenRevoked); - let events = events.map(|event| GuardedEvent::Event(event)); - stream::select(token_events, events).scan((), |(), event| { + // Merge the two streams, then unproject them, stopping at + // `GuardedEvent::TokenRevoked`. + let stream = stream::select(token_events, events).scan((), |(), event| { future::ready(match event { GuardedEvent::Event(event) => Some(event), GuardedEvent::TokenRevoked => None, }) - }) + }); + + Ok(stream) } pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { |
