diff options
| author | Owen Jacobson <owen@grimoire.ca> | 2024-09-20 23:27:59 -0400 |
|---|---|---|
| committer | Owen Jacobson <owen@grimoire.ca> | 2024-09-20 23:27:59 -0400 |
| commit | a4dcc4b5c53966f3c4366e414a3e39d094f21404 (patch) | |
| tree | cc6b3ab3876bb05a81d03569aa6d9a42cdc3c6d1 | |
| parent | 0a05491930fb34ce7c93c33ea0b7599360483fc7 (diff) | |
Push the handling of the `Last-Event-Id` _format_ inside of `channels::app`.
This is intended to make it a bit more opaque to callers, and to free me up to experiment with the event ID format. It also makes event IDs tractable for testing.
| -rw-r--r-- | src/channel/app.rs | 27 | ||||
| -rw-r--r-- | src/events/app.rs | 4 | ||||
| -rw-r--r-- | src/events/routes.rs | 77 | ||||
| -rw-r--r-- | src/header.rs | 15 |
4 files changed, 71 insertions, 52 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs index f9a75d7..48e3e3c 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -56,7 +56,7 @@ impl<'a> Channels<'a> { channel: &channel::Id, body: &str, sent_at: &DateTime, - ) -> Result<(), EventsError> { + ) -> Result<broadcast::Message, EventsError> { let mut tx = self.db.begin().await?; let channel = tx .channels() @@ -69,34 +69,39 @@ impl<'a> Channels<'a> { .await?; tx.commit().await?; - self.broadcaster.broadcast(&channel.id, message); - Ok(()) + self.broadcaster.broadcast(&channel.id, &message); + Ok(message) } pub async fn events( &self, channel: &channel::Id, subscribed_at: &DateTime, - resume_at: Option<&DateTime>, + resume_at: Option<&str>, ) -> Result<impl Stream<Item = broadcast::Message>, EventsError> { // Somewhat arbitrarily, expire after 90 days. let expire_at = subscribed_at.to_owned() - TimeDelta::days(90); - let mut tx = self - .db - .begin() + let resume_at = resume_at + .map(chrono::DateTime::parse_from_rfc3339) + .transpose()? + .map(|resume_at| resume_at.to_utc()); + + let mut tx = self.db.begin().await?; + let channel = tx + .channels() + .by_id(channel) .await .not_found(|| EventsError::ChannelNotFound(channel.clone()))?; - let channel = tx.channels().by_id(channel).await?; let live_messages = self .broadcaster .listen(&channel.id) - .filter(Self::skip_stale(resume_at)) + .filter(Self::skip_stale(resume_at.as_ref())) .filter(Self::skip_expired(&expire_at)); tx.broadcast().expire(&expire_at).await?; - let stored_messages = tx.broadcast().replay(&channel, resume_at).await?; + let stored_messages = tx.broadcast().replay(&channel, resume_at.as_ref()).await?; tx.commit().await?; let stored_messages = stream::iter(stored_messages); @@ -154,5 +159,7 @@ pub enum EventsError { #[error("channel {0} not found")] ChannelNotFound(channel::Id), #[error(transparent)] + ResumeAtError(#[from] chrono::ParseError), + #[error(transparent)] DatabaseError(#[from] sqlx::Error), } diff --git a/src/events/app.rs b/src/events/app.rs index dfb23d7..c3a027d 100644 --- a/src/events/app.rs +++ b/src/events/app.rs @@ -54,7 +54,7 @@ impl Broadcaster { // panic: if ``channel`` has not been previously registered, and was not // part of the initial set of channels. - pub fn broadcast(&self, channel: &channel::Id, message: broadcast::Message) { + pub fn broadcast(&self, channel: &channel::Id, message: &broadcast::Message) { let tx = self.sender(channel); // Per the Tokio docs, the returned error is only used to indicate that @@ -64,7 +64,7 @@ impl Broadcaster { // // The successful return value, which includes the number of active // receivers, also isn't that interesting to us. - let _ = tx.send(message); + let _ = tx.send(message.clone()); } // panic: if ``channel`` has not been previously registered, and was not diff --git a/src/events/routes.rs b/src/events/routes.rs index f880c70..ce5b778 100644 --- a/src/events/routes.rs +++ b/src/events/routes.rs @@ -9,7 +9,7 @@ use axum::{ Router, }; use axum_extra::extract::Query; -use chrono::{self, format::SecondsFormat, DateTime}; +use chrono::{self, format::SecondsFormat}; use futures::stream::{self, Stream, StreamExt as _, TryStreamExt as _}; use super::repo::broadcast; @@ -23,7 +23,7 @@ use crate::{ }; pub fn router() -> Router<App> { - Router::new().route("/api/events", get(on_events)) + Router::new().route("/api/events", get(events)) } #[derive(serde::Deserialize)] @@ -32,20 +32,14 @@ struct EventsQuery { channels: Vec<channel::Id>, } -async fn on_events( +async fn events( State(app): State<App>, RequestedAt(now): RequestedAt, _: Login, // requires auth, but doesn't actually care who you are last_event_id: Option<LastEventId>, Query(query): Query<EventsQuery>, -) -> Result<Events<impl Stream<Item = ChannelEvent<broadcast::Message>>>, ErrorResponse> { - let resume_at = last_event_id - .map(|LastEventId(header)| header) - .map(|header| DateTime::parse_from_rfc3339(&header)) - .transpose() - // impl From would take more code; this is used once. - .map_err(ErrorResponse::LastEventIdError)? - .map(|ts| ts.to_utc()); +) -> Result<Events<impl Stream<Item = ChannelEvent>>, ErrorResponse> { + let resume_at = last_event_id.as_deref(); let streams = stream::iter(query.channels) .then(|channel| { @@ -53,7 +47,7 @@ async fn on_events( async move { let events = app .channels() - .events(&channel, &now, resume_at.as_ref()) + .events(&channel, &now, resume_at) .await? .map(ChannelEvent::wrap(channel)); @@ -63,7 +57,7 @@ async fn on_events( .try_collect::<Vec<_>>() .await // impl From would take more code; this is used once. - .map_err(ErrorResponse::EventsError)?; + .map_err(ErrorResponse)?; let stream = stream::select_all(streams); @@ -74,60 +68,63 @@ struct Events<S>(S); impl<S> IntoResponse for Events<S> where - S: Stream<Item = ChannelEvent<broadcast::Message>> + Send + 'static, + S: Stream<Item = ChannelEvent> + Send + 'static, { fn into_response(self) -> Response { let Self(stream) = self; - let stream = stream.map(to_sse_event); + let stream = stream.map(sse::Event::try_from); Sse::new(stream) .keep_alive(sse::KeepAlive::default()) .into_response() } } -enum ErrorResponse { - EventsError(EventsError), - LastEventIdError(chrono::ParseError), -} +struct ErrorResponse(EventsError); impl IntoResponse for ErrorResponse { fn into_response(self) -> Response { - match self { - Self::EventsError(not_found @ EventsError::ChannelNotFound(_)) => { + let Self(error) = self; + match error { + not_found @ EventsError::ChannelNotFound(_) => { (StatusCode::NOT_FOUND, not_found.to_string()).into_response() } - Self::EventsError(other) => InternalError::from(other).into_response(), - Self::LastEventIdError(other) => { - (StatusCode::BAD_REQUEST, other.to_string()).into_response() + resume_at @ EventsError::ResumeAtError(_) => { + (StatusCode::BAD_REQUEST, resume_at.to_string()).into_response() } + other => InternalError::from(other).into_response(), } } } -fn to_sse_event(event: ChannelEvent<broadcast::Message>) -> Result<sse::Event, serde_json::Error> { - let data = serde_json::to_string_pretty(&event)?; - let event = sse::Event::default() - .id(event - .message - .sent_at - .to_rfc3339_opts(SecondsFormat::AutoSi, /* use_z */ true)) - .data(&data); - - Ok(event) -} - #[derive(serde::Serialize)] -struct ChannelEvent<M> { +struct ChannelEvent { channel: channel::Id, #[serde(flatten)] - message: M, + message: broadcast::Message, } -impl<M> ChannelEvent<M> { - fn wrap(channel: channel::Id) -> impl Fn(M) -> Self { +impl ChannelEvent { + fn wrap(channel: channel::Id) -> impl Fn(broadcast::Message) -> Self { move |message| Self { channel: channel.clone(), message, } } + + fn event_id(&self) -> String { + self.message + .sent_at + .to_rfc3339_opts(SecondsFormat::AutoSi, /* use_z */ true) + } +} + +impl TryFrom<ChannelEvent> for sse::Event { + type Error = serde_json::Error; + + fn try_from(value: ChannelEvent) -> Result<Self, Self::Error> { + let data = serde_json::to_string_pretty(&value)?; + let event = Self::default().id(value.event_id()).data(&data); + + Ok(event) + } } diff --git a/src/header.rs b/src/header.rs index 904e29d..61cc561 100644 --- a/src/header.rs +++ b/src/header.rs @@ -56,3 +56,18 @@ where Ok(requested_at) } } + +impl From<String> for LastEventId { + fn from(header: String) -> Self { + Self(header) + } +} + +impl std::ops::Deref for LastEventId { + type Target = str; + + fn deref(&self) -> &Self::Target { + let Self(header) = self; + header + } +} |
