summaryrefslogtreecommitdiff
path: root/src/events.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/events.rs')
-rw-r--r--src/events.rs56
1 files changed, 47 insertions, 9 deletions
diff --git a/src/events.rs b/src/events.rs
index 5d2dcf0..fd73d63 100644
--- a/src/events.rs
+++ b/src/events.rs
@@ -1,15 +1,16 @@
use axum::{
extract::State,
+ http::StatusCode,
response::{
sse::{self, Sse},
- IntoResponse,
+ IntoResponse, Response,
},
routing::get,
Router,
};
use axum_extra::extract::Query;
-use chrono::{format::SecondsFormat, DateTime};
-use futures::stream::{self, StreamExt as _, TryStreamExt as _};
+use chrono::{self, format::SecondsFormat, DateTime};
+use futures::stream::{self, Stream, StreamExt as _, TryStreamExt as _};
use crate::{
app::App,
@@ -34,11 +35,13 @@ async fn on_events(
_: Login, // requires auth, but doesn't actually care who you are
last_event_id: Option<LastEventId>,
Query(query): Query<EventsQuery>,
-) -> Result<impl IntoResponse, InternalError> {
+) -> Result<Events<impl Stream<Item = ChannelEvent<broadcast::Message>>>, ErrorResponse> {
let resume_at = last_event_id
.map(|LastEventId(header)| header)
.map(|header| DateTime::parse_from_rfc3339(&header))
- .transpose()?
+ .transpose()
+ // impl From would take more code; this is used once.
+ .map_err(ErrorResponse::LastEventIdError)?
.map(|ts| ts.to_utc());
let streams = stream::iter(query.channels)
@@ -55,12 +58,47 @@ async fn on_events(
}
})
.try_collect::<Vec<_>>()
- .await?;
+ .await
+ // impl From would take more code; this is used once.
+ .map_err(ErrorResponse::EventsError)?;
- let stream = stream::select_all(streams).map(to_sse_event);
- let sse = Sse::new(stream).keep_alive(sse::KeepAlive::default());
+ let stream = stream::select_all(streams);
- Ok(sse)
+ Ok(Events(stream))
+}
+
+struct Events<S>(S);
+
+impl<S> IntoResponse for Events<S>
+where
+ S: Stream<Item = ChannelEvent<broadcast::Message>> + Send + 'static,
+{
+ fn into_response(self) -> Response {
+ let Self(stream) = self;
+ let stream = stream.map(to_sse_event);
+ Sse::new(stream)
+ .keep_alive(sse::KeepAlive::default())
+ .into_response()
+ }
+}
+
+enum ErrorResponse {
+ EventsError(EventsError),
+ LastEventIdError(chrono::ParseError),
+}
+
+impl IntoResponse for ErrorResponse {
+ fn into_response(self) -> Response {
+ match self {
+ Self::EventsError(not_found @ EventsError::ChannelNotFound(_)) => {
+ (StatusCode::NOT_FOUND, not_found.to_string()).into_response()
+ }
+ Self::EventsError(other) => InternalError::from(other).into_response(),
+ Self::LastEventIdError(other) => {
+ (StatusCode::BAD_REQUEST, other.to_string()).into_response()
+ }
+ }
+ }
}
fn to_sse_event(event: ChannelEvent<broadcast::Message>) -> Result<sse::Event, serde_json::Error> {