diff options
| author | Owen Jacobson <owen@grimoire.ca> | 2024-10-01 20:32:57 -0400 |
|---|---|---|
| committer | Owen Jacobson <owen@grimoire.ca> | 2024-10-01 20:32:57 -0400 |
| commit | 7645411bcf7201e3a4927566da78080dc6a84ccf (patch) | |
| tree | 2711922bfeab6dc8b6494e9b0976f3f051dff4a9 /src/events | |
| parent | 6c054c5b8d43a818ccfa9087960dc19b286e6bb7 (diff) | |
Prevent racing between `limit_stream` and logging out.
Diffstat (limited to 'src/events')
| -rw-r--r-- | src/events/routes.rs | 26 |
1 files changed, 23 insertions, 3 deletions
diff --git a/src/events/routes.rs b/src/events/routes.rs index ec9dae2..f09474c 100644 --- a/src/events/routes.rs +++ b/src/events/routes.rs @@ -13,7 +13,11 @@ use super::{ extract::LastEventId, types::{self, ResumePoint}, }; -use crate::{app::App, error::Internal, login::extract::Identity}; +use crate::{ + app::App, + error::{Internal, Unauthorized}, + login::{app::ValidateError, extract::Identity}, +}; #[cfg(test)] mod test; @@ -26,13 +30,13 @@ async fn events( State(app): State<App>, identity: Identity, last_event_id: Option<LastEventId<ResumePoint>>, -) -> Result<Events<impl Stream<Item = types::ResumableEvent> + std::fmt::Debug>, Internal> { +) -> Result<Events<impl Stream<Item = types::ResumableEvent> + std::fmt::Debug>, EventsError> { let resume_at = last_event_id .map(LastEventId::into_inner) .unwrap_or_default(); let stream = app.events().subscribe(resume_at).await?; - let stream = app.logins().limit_stream(identity.token, stream); + let stream = app.logins().limit_stream(identity.token, stream).await?; Ok(Events(stream)) } @@ -67,3 +71,19 @@ impl TryFrom<types::ResumableEvent> for sse::Event { Ok(event) } } + +#[derive(Debug, thiserror::Error)] +#[error(transparent)] +pub enum EventsError { + DatabaseError(#[from] sqlx::Error), + ValidateError(#[from] ValidateError), +} + +impl IntoResponse for EventsError { + fn into_response(self) -> Response { + match self { + Self::ValidateError(ValidateError::InvalidToken) => Unauthorized.into_response(), + other => Internal::from(other).into_response(), + } + } +} |
