From a4dcc4b5c53966f3c4366e414a3e39d094f21404 Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Fri, 20 Sep 2024 23:27:59 -0400 Subject: Push the handling of the `Last-Event-Id` _format_ inside of `channels::app`. This is intended to make it a bit more opaque to callers, and to free me up to experiment with the event ID format. It also makes event IDs tractable for testing. --- src/events/app.rs | 4 +-- src/events/routes.rs | 77 +++++++++++++++++++++++++--------------------------- 2 files changed, 39 insertions(+), 42 deletions(-) (limited to 'src/events') diff --git a/src/events/app.rs b/src/events/app.rs index dfb23d7..c3a027d 100644 --- a/src/events/app.rs +++ b/src/events/app.rs @@ -54,7 +54,7 @@ impl Broadcaster { // panic: if ``channel`` has not been previously registered, and was not // part of the initial set of channels. - pub fn broadcast(&self, channel: &channel::Id, message: broadcast::Message) { + pub fn broadcast(&self, channel: &channel::Id, message: &broadcast::Message) { let tx = self.sender(channel); // Per the Tokio docs, the returned error is only used to indicate that @@ -64,7 +64,7 @@ impl Broadcaster { // // The successful return value, which includes the number of active // receivers, also isn't that interesting to us. - let _ = tx.send(message); + let _ = tx.send(message.clone()); } // panic: if ``channel`` has not been previously registered, and was not diff --git a/src/events/routes.rs b/src/events/routes.rs index f880c70..ce5b778 100644 --- a/src/events/routes.rs +++ b/src/events/routes.rs @@ -9,7 +9,7 @@ use axum::{ Router, }; use axum_extra::extract::Query; -use chrono::{self, format::SecondsFormat, DateTime}; +use chrono::{self, format::SecondsFormat}; use futures::stream::{self, Stream, StreamExt as _, TryStreamExt as _}; use super::repo::broadcast; @@ -23,7 +23,7 @@ use crate::{ }; pub fn router() -> Router { - Router::new().route("/api/events", get(on_events)) + Router::new().route("/api/events", get(events)) } #[derive(serde::Deserialize)] @@ -32,20 +32,14 @@ struct EventsQuery { channels: Vec, } -async fn on_events( +async fn events( State(app): State, RequestedAt(now): RequestedAt, _: Login, // requires auth, but doesn't actually care who you are last_event_id: Option, Query(query): Query, -) -> Result>>, ErrorResponse> { - let resume_at = last_event_id - .map(|LastEventId(header)| header) - .map(|header| DateTime::parse_from_rfc3339(&header)) - .transpose() - // impl From would take more code; this is used once. - .map_err(ErrorResponse::LastEventIdError)? - .map(|ts| ts.to_utc()); +) -> Result>, ErrorResponse> { + let resume_at = last_event_id.as_deref(); let streams = stream::iter(query.channels) .then(|channel| { @@ -53,7 +47,7 @@ async fn on_events( async move { let events = app .channels() - .events(&channel, &now, resume_at.as_ref()) + .events(&channel, &now, resume_at) .await? .map(ChannelEvent::wrap(channel)); @@ -63,7 +57,7 @@ async fn on_events( .try_collect::>() .await // impl From would take more code; this is used once. - .map_err(ErrorResponse::EventsError)?; + .map_err(ErrorResponse)?; let stream = stream::select_all(streams); @@ -74,60 +68,63 @@ struct Events(S); impl IntoResponse for Events where - S: Stream> + Send + 'static, + S: Stream + Send + 'static, { fn into_response(self) -> Response { let Self(stream) = self; - let stream = stream.map(to_sse_event); + let stream = stream.map(sse::Event::try_from); Sse::new(stream) .keep_alive(sse::KeepAlive::default()) .into_response() } } -enum ErrorResponse { - EventsError(EventsError), - LastEventIdError(chrono::ParseError), -} +struct ErrorResponse(EventsError); impl IntoResponse for ErrorResponse { fn into_response(self) -> Response { - match self { - Self::EventsError(not_found @ EventsError::ChannelNotFound(_)) => { + let Self(error) = self; + match error { + not_found @ EventsError::ChannelNotFound(_) => { (StatusCode::NOT_FOUND, not_found.to_string()).into_response() } - Self::EventsError(other) => InternalError::from(other).into_response(), - Self::LastEventIdError(other) => { - (StatusCode::BAD_REQUEST, other.to_string()).into_response() + resume_at @ EventsError::ResumeAtError(_) => { + (StatusCode::BAD_REQUEST, resume_at.to_string()).into_response() } + other => InternalError::from(other).into_response(), } } } -fn to_sse_event(event: ChannelEvent) -> Result { - let data = serde_json::to_string_pretty(&event)?; - let event = sse::Event::default() - .id(event - .message - .sent_at - .to_rfc3339_opts(SecondsFormat::AutoSi, /* use_z */ true)) - .data(&data); - - Ok(event) -} - #[derive(serde::Serialize)] -struct ChannelEvent { +struct ChannelEvent { channel: channel::Id, #[serde(flatten)] - message: M, + message: broadcast::Message, } -impl ChannelEvent { - fn wrap(channel: channel::Id) -> impl Fn(M) -> Self { +impl ChannelEvent { + fn wrap(channel: channel::Id) -> impl Fn(broadcast::Message) -> Self { move |message| Self { channel: channel.clone(), message, } } + + fn event_id(&self) -> String { + self.message + .sent_at + .to_rfc3339_opts(SecondsFormat::AutoSi, /* use_z */ true) + } +} + +impl TryFrom for sse::Event { + type Error = serde_json::Error; + + fn try_from(value: ChannelEvent) -> Result { + let data = serde_json::to_string_pretty(&value)?; + let event = Self::default().id(value.event_id()).data(&data); + + Ok(event) + } } -- cgit v1.2.3