diff options
| author | Owen Jacobson <owen@grimoire.ca> | 2024-09-20 23:27:59 -0400 |
|---|---|---|
| committer | Owen Jacobson <owen@grimoire.ca> | 2024-09-20 23:27:59 -0400 |
| commit | a4dcc4b5c53966f3c4366e414a3e39d094f21404 (patch) | |
| tree | cc6b3ab3876bb05a81d03569aa6d9a42cdc3c6d1 /src/events | |
| parent | 0a05491930fb34ce7c93c33ea0b7599360483fc7 (diff) | |
Push the handling of the `Last-Event-Id` _format_ inside of `channels::app`.
This is intended to make it a bit more opaque to callers, and to free me up to experiment with the event ID format. It also makes event IDs tractable for testing.
Diffstat (limited to 'src/events')
| -rw-r--r-- | src/events/app.rs | 4 | ||||
| -rw-r--r-- | src/events/routes.rs | 77 |
2 files changed, 39 insertions, 42 deletions
diff --git a/src/events/app.rs b/src/events/app.rs index dfb23d7..c3a027d 100644 --- a/src/events/app.rs +++ b/src/events/app.rs @@ -54,7 +54,7 @@ impl Broadcaster { // panic: if ``channel`` has not been previously registered, and was not // part of the initial set of channels. - pub fn broadcast(&self, channel: &channel::Id, message: broadcast::Message) { + pub fn broadcast(&self, channel: &channel::Id, message: &broadcast::Message) { let tx = self.sender(channel); // Per the Tokio docs, the returned error is only used to indicate that @@ -64,7 +64,7 @@ impl Broadcaster { // // The successful return value, which includes the number of active // receivers, also isn't that interesting to us. - let _ = tx.send(message); + let _ = tx.send(message.clone()); } // panic: if ``channel`` has not been previously registered, and was not diff --git a/src/events/routes.rs b/src/events/routes.rs index f880c70..ce5b778 100644 --- a/src/events/routes.rs +++ b/src/events/routes.rs @@ -9,7 +9,7 @@ use axum::{ Router, }; use axum_extra::extract::Query; -use chrono::{self, format::SecondsFormat, DateTime}; +use chrono::{self, format::SecondsFormat}; use futures::stream::{self, Stream, StreamExt as _, TryStreamExt as _}; use super::repo::broadcast; @@ -23,7 +23,7 @@ use crate::{ }; pub fn router() -> Router<App> { - Router::new().route("/api/events", get(on_events)) + Router::new().route("/api/events", get(events)) } #[derive(serde::Deserialize)] @@ -32,20 +32,14 @@ struct EventsQuery { channels: Vec<channel::Id>, } -async fn on_events( +async fn events( State(app): State<App>, RequestedAt(now): RequestedAt, _: Login, // requires auth, but doesn't actually care who you are last_event_id: Option<LastEventId>, Query(query): Query<EventsQuery>, -) -> Result<Events<impl Stream<Item = ChannelEvent<broadcast::Message>>>, ErrorResponse> { - let resume_at = last_event_id - .map(|LastEventId(header)| header) - .map(|header| DateTime::parse_from_rfc3339(&header)) - .transpose() - // impl From would take more code; this is used once. - .map_err(ErrorResponse::LastEventIdError)? - .map(|ts| ts.to_utc()); +) -> Result<Events<impl Stream<Item = ChannelEvent>>, ErrorResponse> { + let resume_at = last_event_id.as_deref(); let streams = stream::iter(query.channels) .then(|channel| { @@ -53,7 +47,7 @@ async fn on_events( async move { let events = app .channels() - .events(&channel, &now, resume_at.as_ref()) + .events(&channel, &now, resume_at) .await? .map(ChannelEvent::wrap(channel)); @@ -63,7 +57,7 @@ async fn on_events( .try_collect::<Vec<_>>() .await // impl From would take more code; this is used once. - .map_err(ErrorResponse::EventsError)?; + .map_err(ErrorResponse)?; let stream = stream::select_all(streams); @@ -74,60 +68,63 @@ struct Events<S>(S); impl<S> IntoResponse for Events<S> where - S: Stream<Item = ChannelEvent<broadcast::Message>> + Send + 'static, + S: Stream<Item = ChannelEvent> + Send + 'static, { fn into_response(self) -> Response { let Self(stream) = self; - let stream = stream.map(to_sse_event); + let stream = stream.map(sse::Event::try_from); Sse::new(stream) .keep_alive(sse::KeepAlive::default()) .into_response() } } -enum ErrorResponse { - EventsError(EventsError), - LastEventIdError(chrono::ParseError), -} +struct ErrorResponse(EventsError); impl IntoResponse for ErrorResponse { fn into_response(self) -> Response { - match self { - Self::EventsError(not_found @ EventsError::ChannelNotFound(_)) => { + let Self(error) = self; + match error { + not_found @ EventsError::ChannelNotFound(_) => { (StatusCode::NOT_FOUND, not_found.to_string()).into_response() } - Self::EventsError(other) => InternalError::from(other).into_response(), - Self::LastEventIdError(other) => { - (StatusCode::BAD_REQUEST, other.to_string()).into_response() + resume_at @ EventsError::ResumeAtError(_) => { + (StatusCode::BAD_REQUEST, resume_at.to_string()).into_response() } + other => InternalError::from(other).into_response(), } } } -fn to_sse_event(event: ChannelEvent<broadcast::Message>) -> Result<sse::Event, serde_json::Error> { - let data = serde_json::to_string_pretty(&event)?; - let event = sse::Event::default() - .id(event - .message - .sent_at - .to_rfc3339_opts(SecondsFormat::AutoSi, /* use_z */ true)) - .data(&data); - - Ok(event) -} - #[derive(serde::Serialize)] -struct ChannelEvent<M> { +struct ChannelEvent { channel: channel::Id, #[serde(flatten)] - message: M, + message: broadcast::Message, } -impl<M> ChannelEvent<M> { - fn wrap(channel: channel::Id) -> impl Fn(M) -> Self { +impl ChannelEvent { + fn wrap(channel: channel::Id) -> impl Fn(broadcast::Message) -> Self { move |message| Self { channel: channel.clone(), message, } } + + fn event_id(&self) -> String { + self.message + .sent_at + .to_rfc3339_opts(SecondsFormat::AutoSi, /* use_z */ true) + } +} + +impl TryFrom<ChannelEvent> for sse::Event { + type Error = serde_json::Error; + + fn try_from(value: ChannelEvent) -> Result<Self, Self::Error> { + let data = serde_json::to_string_pretty(&value)?; + let event = Self::default().id(value.event_id()).data(&data); + + Ok(event) + } } |
