diff options
Diffstat (limited to 'src/events.rs')
| -rw-r--r-- | src/events.rs | 56 |
1 files changed, 47 insertions, 9 deletions
diff --git a/src/events.rs b/src/events.rs index 5d2dcf0..fd73d63 100644 --- a/src/events.rs +++ b/src/events.rs @@ -1,15 +1,16 @@ use axum::{ extract::State, + http::StatusCode, response::{ sse::{self, Sse}, - IntoResponse, + IntoResponse, Response, }, routing::get, Router, }; use axum_extra::extract::Query; -use chrono::{format::SecondsFormat, DateTime}; -use futures::stream::{self, StreamExt as _, TryStreamExt as _}; +use chrono::{self, format::SecondsFormat, DateTime}; +use futures::stream::{self, Stream, StreamExt as _, TryStreamExt as _}; use crate::{ app::App, @@ -34,11 +35,13 @@ async fn on_events( _: Login, // requires auth, but doesn't actually care who you are last_event_id: Option<LastEventId>, Query(query): Query<EventsQuery>, -) -> Result<impl IntoResponse, InternalError> { +) -> Result<Events<impl Stream<Item = ChannelEvent<broadcast::Message>>>, ErrorResponse> { let resume_at = last_event_id .map(|LastEventId(header)| header) .map(|header| DateTime::parse_from_rfc3339(&header)) - .transpose()? + .transpose() + // impl From would take more code; this is used once. + .map_err(ErrorResponse::LastEventIdError)? .map(|ts| ts.to_utc()); let streams = stream::iter(query.channels) @@ -55,12 +58,47 @@ async fn on_events( } }) .try_collect::<Vec<_>>() - .await?; + .await + // impl From would take more code; this is used once. + .map_err(ErrorResponse::EventsError)?; - let stream = stream::select_all(streams).map(to_sse_event); - let sse = Sse::new(stream).keep_alive(sse::KeepAlive::default()); + let stream = stream::select_all(streams); - Ok(sse) + Ok(Events(stream)) +} + +struct Events<S>(S); + +impl<S> IntoResponse for Events<S> +where + S: Stream<Item = ChannelEvent<broadcast::Message>> + Send + 'static, +{ + fn into_response(self) -> Response { + let Self(stream) = self; + let stream = stream.map(to_sse_event); + Sse::new(stream) + .keep_alive(sse::KeepAlive::default()) + .into_response() + } +} + +enum ErrorResponse { + EventsError(EventsError), + LastEventIdError(chrono::ParseError), +} + +impl IntoResponse for ErrorResponse { + fn into_response(self) -> Response { + match self { + Self::EventsError(not_found @ EventsError::ChannelNotFound(_)) => { + (StatusCode::NOT_FOUND, not_found.to_string()).into_response() + } + Self::EventsError(other) => InternalError::from(other).into_response(), + Self::LastEventIdError(other) => { + (StatusCode::BAD_REQUEST, other.to_string()).into_response() + } + } + } } fn to_sse_event(event: ChannelEvent<broadcast::Message>) -> Result<sse::Event, serde_json::Error> { |
