From 7645411bcf7201e3a4927566da78080dc6a84ccf Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Tue, 1 Oct 2024 20:32:57 -0400 Subject: Prevent racing between `limit_stream` and logging out. --- src/events/routes.rs | 26 +++++++++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) (limited to 'src/events') diff --git a/src/events/routes.rs b/src/events/routes.rs index ec9dae2..f09474c 100644 --- a/src/events/routes.rs +++ b/src/events/routes.rs @@ -13,7 +13,11 @@ use super::{ extract::LastEventId, types::{self, ResumePoint}, }; -use crate::{app::App, error::Internal, login::extract::Identity}; +use crate::{ + app::App, + error::{Internal, Unauthorized}, + login::{app::ValidateError, extract::Identity}, +}; #[cfg(test)] mod test; @@ -26,13 +30,13 @@ async fn events( State(app): State, identity: Identity, last_event_id: Option>, -) -> Result + std::fmt::Debug>, Internal> { +) -> Result + std::fmt::Debug>, EventsError> { let resume_at = last_event_id .map(LastEventId::into_inner) .unwrap_or_default(); let stream = app.events().subscribe(resume_at).await?; - let stream = app.logins().limit_stream(identity.token, stream); + let stream = app.logins().limit_stream(identity.token, stream).await?; Ok(Events(stream)) } @@ -67,3 +71,19 @@ impl TryFrom for sse::Event { Ok(event) } } + +#[derive(Debug, thiserror::Error)] +#[error(transparent)] +pub enum EventsError { + DatabaseError(#[from] sqlx::Error), + ValidateError(#[from] ValidateError), +} + +impl IntoResponse for EventsError { + fn into_response(self) -> Response { + match self { + Self::ValidateError(ValidateError::InvalidToken) => Unauthorized.into_response(), + other => Internal::from(other).into_response(), + } + } +} -- cgit v1.2.3