diff options
| author | Owen Jacobson <owen@grimoire.ca> | 2024-09-18 12:16:43 -0400 |
|---|---|---|
| committer | Owen Jacobson <owen@grimoire.ca> | 2024-09-18 12:18:58 -0400 |
| commit | 9fb4d3e561786f01352cbd14894d994ea537b5ec (patch) | |
| tree | 959ddcf4592c6140b71be38149051c56a788bcfe /src/events.rs | |
| parent | 2b4cf5c62ff82fa408a4f82bde0b561ff3b15497 (diff) | |
Return 404s when resources are not found.
This is implemented by making the return values, in most cases, idiosyncratic ad-hoc types that then convert to the approprate error response. This also should make endpoints more testable, since the return values can now be inspected to check their properties without having to process or parse an HTTP response.
Diffstat (limited to 'src/events.rs')
| -rw-r--r-- | src/events.rs | 56 |
1 files changed, 47 insertions, 9 deletions
diff --git a/src/events.rs b/src/events.rs index 5d2dcf0..fd73d63 100644 --- a/src/events.rs +++ b/src/events.rs @@ -1,15 +1,16 @@ use axum::{ extract::State, + http::StatusCode, response::{ sse::{self, Sse}, - IntoResponse, + IntoResponse, Response, }, routing::get, Router, }; use axum_extra::extract::Query; -use chrono::{format::SecondsFormat, DateTime}; -use futures::stream::{self, StreamExt as _, TryStreamExt as _}; +use chrono::{self, format::SecondsFormat, DateTime}; +use futures::stream::{self, Stream, StreamExt as _, TryStreamExt as _}; use crate::{ app::App, @@ -34,11 +35,13 @@ async fn on_events( _: Login, // requires auth, but doesn't actually care who you are last_event_id: Option<LastEventId>, Query(query): Query<EventsQuery>, -) -> Result<impl IntoResponse, InternalError> { +) -> Result<Events<impl Stream<Item = ChannelEvent<broadcast::Message>>>, ErrorResponse> { let resume_at = last_event_id .map(|LastEventId(header)| header) .map(|header| DateTime::parse_from_rfc3339(&header)) - .transpose()? + .transpose() + // impl From would take more code; this is used once. + .map_err(ErrorResponse::LastEventIdError)? .map(|ts| ts.to_utc()); let streams = stream::iter(query.channels) @@ -55,12 +58,47 @@ async fn on_events( } }) .try_collect::<Vec<_>>() - .await?; + .await + // impl From would take more code; this is used once. + .map_err(ErrorResponse::EventsError)?; - let stream = stream::select_all(streams).map(to_sse_event); - let sse = Sse::new(stream).keep_alive(sse::KeepAlive::default()); + let stream = stream::select_all(streams); - Ok(sse) + Ok(Events(stream)) +} + +struct Events<S>(S); + +impl<S> IntoResponse for Events<S> +where + S: Stream<Item = ChannelEvent<broadcast::Message>> + Send + 'static, +{ + fn into_response(self) -> Response { + let Self(stream) = self; + let stream = stream.map(to_sse_event); + Sse::new(stream) + .keep_alive(sse::KeepAlive::default()) + .into_response() + } +} + +enum ErrorResponse { + EventsError(EventsError), + LastEventIdError(chrono::ParseError), +} + +impl IntoResponse for ErrorResponse { + fn into_response(self) -> Response { + match self { + Self::EventsError(not_found @ EventsError::ChannelNotFound(_)) => { + (StatusCode::NOT_FOUND, not_found.to_string()).into_response() + } + Self::EventsError(other) => InternalError::from(other).into_response(), + Self::LastEventIdError(other) => { + (StatusCode::BAD_REQUEST, other.to_string()).into_response() + } + } + } } fn to_sse_event(event: ChannelEvent<broadcast::Message>) -> Result<sse::Event, serde_json::Error> { |
