summaryrefslogtreecommitdiff
path: root/src/events
diff options
context:
space:
mode:
Diffstat (limited to 'src/events')
-rw-r--r--src/events/routes.rs26
1 files changed, 23 insertions, 3 deletions
diff --git a/src/events/routes.rs b/src/events/routes.rs
index ec9dae2..f09474c 100644
--- a/src/events/routes.rs
+++ b/src/events/routes.rs
@@ -13,7 +13,11 @@ use super::{
extract::LastEventId,
types::{self, ResumePoint},
};
-use crate::{app::App, error::Internal, login::extract::Identity};
+use crate::{
+ app::App,
+ error::{Internal, Unauthorized},
+ login::{app::ValidateError, extract::Identity},
+};
#[cfg(test)]
mod test;
@@ -26,13 +30,13 @@ async fn events(
State(app): State<App>,
identity: Identity,
last_event_id: Option<LastEventId<ResumePoint>>,
-) -> Result<Events<impl Stream<Item = types::ResumableEvent> + std::fmt::Debug>, Internal> {
+) -> Result<Events<impl Stream<Item = types::ResumableEvent> + std::fmt::Debug>, EventsError> {
let resume_at = last_event_id
.map(LastEventId::into_inner)
.unwrap_or_default();
let stream = app.events().subscribe(resume_at).await?;
- let stream = app.logins().limit_stream(identity.token, stream);
+ let stream = app.logins().limit_stream(identity.token, stream).await?;
Ok(Events(stream))
}
@@ -67,3 +71,19 @@ impl TryFrom<types::ResumableEvent> for sse::Event {
Ok(event)
}
}
+
+#[derive(Debug, thiserror::Error)]
+#[error(transparent)]
+pub enum EventsError {
+ DatabaseError(#[from] sqlx::Error),
+ ValidateError(#[from] ValidateError),
+}
+
+impl IntoResponse for EventsError {
+ fn into_response(self) -> Response {
+ match self {
+ Self::ValidateError(ValidateError::InvalidToken) => Unauthorized.into_response(),
+ other => Internal::from(other).into_response(),
+ }
+ }
+}