summaryrefslogtreecommitdiff
path: root/src/events
diff options
context:
space:
mode:
authorOwen Jacobson <owen@grimoire.ca>2024-09-20 23:27:59 -0400
committerOwen Jacobson <owen@grimoire.ca>2024-09-20 23:27:59 -0400
commita4dcc4b5c53966f3c4366e414a3e39d094f21404 (patch)
treecc6b3ab3876bb05a81d03569aa6d9a42cdc3c6d1 /src/events
parent0a05491930fb34ce7c93c33ea0b7599360483fc7 (diff)
Push the handling of the `Last-Event-Id` _format_ inside of `channels::app`.
This is intended to make it a bit more opaque to callers, and to free me up to experiment with the event ID format. It also makes event IDs tractable for testing.
Diffstat (limited to 'src/events')
-rw-r--r--src/events/app.rs4
-rw-r--r--src/events/routes.rs77
2 files changed, 39 insertions, 42 deletions
diff --git a/src/events/app.rs b/src/events/app.rs
index dfb23d7..c3a027d 100644
--- a/src/events/app.rs
+++ b/src/events/app.rs
@@ -54,7 +54,7 @@ impl Broadcaster {
// panic: if ``channel`` has not been previously registered, and was not
// part of the initial set of channels.
- pub fn broadcast(&self, channel: &channel::Id, message: broadcast::Message) {
+ pub fn broadcast(&self, channel: &channel::Id, message: &broadcast::Message) {
let tx = self.sender(channel);
// Per the Tokio docs, the returned error is only used to indicate that
@@ -64,7 +64,7 @@ impl Broadcaster {
//
// The successful return value, which includes the number of active
// receivers, also isn't that interesting to us.
- let _ = tx.send(message);
+ let _ = tx.send(message.clone());
}
// panic: if ``channel`` has not been previously registered, and was not
diff --git a/src/events/routes.rs b/src/events/routes.rs
index f880c70..ce5b778 100644
--- a/src/events/routes.rs
+++ b/src/events/routes.rs
@@ -9,7 +9,7 @@ use axum::{
Router,
};
use axum_extra::extract::Query;
-use chrono::{self, format::SecondsFormat, DateTime};
+use chrono::{self, format::SecondsFormat};
use futures::stream::{self, Stream, StreamExt as _, TryStreamExt as _};
use super::repo::broadcast;
@@ -23,7 +23,7 @@ use crate::{
};
pub fn router() -> Router<App> {
- Router::new().route("/api/events", get(on_events))
+ Router::new().route("/api/events", get(events))
}
#[derive(serde::Deserialize)]
@@ -32,20 +32,14 @@ struct EventsQuery {
channels: Vec<channel::Id>,
}
-async fn on_events(
+async fn events(
State(app): State<App>,
RequestedAt(now): RequestedAt,
_: Login, // requires auth, but doesn't actually care who you are
last_event_id: Option<LastEventId>,
Query(query): Query<EventsQuery>,
-) -> Result<Events<impl Stream<Item = ChannelEvent<broadcast::Message>>>, ErrorResponse> {
- let resume_at = last_event_id
- .map(|LastEventId(header)| header)
- .map(|header| DateTime::parse_from_rfc3339(&header))
- .transpose()
- // impl From would take more code; this is used once.
- .map_err(ErrorResponse::LastEventIdError)?
- .map(|ts| ts.to_utc());
+) -> Result<Events<impl Stream<Item = ChannelEvent>>, ErrorResponse> {
+ let resume_at = last_event_id.as_deref();
let streams = stream::iter(query.channels)
.then(|channel| {
@@ -53,7 +47,7 @@ async fn on_events(
async move {
let events = app
.channels()
- .events(&channel, &now, resume_at.as_ref())
+ .events(&channel, &now, resume_at)
.await?
.map(ChannelEvent::wrap(channel));
@@ -63,7 +57,7 @@ async fn on_events(
.try_collect::<Vec<_>>()
.await
// impl From would take more code; this is used once.
- .map_err(ErrorResponse::EventsError)?;
+ .map_err(ErrorResponse)?;
let stream = stream::select_all(streams);
@@ -74,60 +68,63 @@ struct Events<S>(S);
impl<S> IntoResponse for Events<S>
where
- S: Stream<Item = ChannelEvent<broadcast::Message>> + Send + 'static,
+ S: Stream<Item = ChannelEvent> + Send + 'static,
{
fn into_response(self) -> Response {
let Self(stream) = self;
- let stream = stream.map(to_sse_event);
+ let stream = stream.map(sse::Event::try_from);
Sse::new(stream)
.keep_alive(sse::KeepAlive::default())
.into_response()
}
}
-enum ErrorResponse {
- EventsError(EventsError),
- LastEventIdError(chrono::ParseError),
-}
+struct ErrorResponse(EventsError);
impl IntoResponse for ErrorResponse {
fn into_response(self) -> Response {
- match self {
- Self::EventsError(not_found @ EventsError::ChannelNotFound(_)) => {
+ let Self(error) = self;
+ match error {
+ not_found @ EventsError::ChannelNotFound(_) => {
(StatusCode::NOT_FOUND, not_found.to_string()).into_response()
}
- Self::EventsError(other) => InternalError::from(other).into_response(),
- Self::LastEventIdError(other) => {
- (StatusCode::BAD_REQUEST, other.to_string()).into_response()
+ resume_at @ EventsError::ResumeAtError(_) => {
+ (StatusCode::BAD_REQUEST, resume_at.to_string()).into_response()
}
+ other => InternalError::from(other).into_response(),
}
}
}
-fn to_sse_event(event: ChannelEvent<broadcast::Message>) -> Result<sse::Event, serde_json::Error> {
- let data = serde_json::to_string_pretty(&event)?;
- let event = sse::Event::default()
- .id(event
- .message
- .sent_at
- .to_rfc3339_opts(SecondsFormat::AutoSi, /* use_z */ true))
- .data(&data);
-
- Ok(event)
-}
-
#[derive(serde::Serialize)]
-struct ChannelEvent<M> {
+struct ChannelEvent {
channel: channel::Id,
#[serde(flatten)]
- message: M,
+ message: broadcast::Message,
}
-impl<M> ChannelEvent<M> {
- fn wrap(channel: channel::Id) -> impl Fn(M) -> Self {
+impl ChannelEvent {
+ fn wrap(channel: channel::Id) -> impl Fn(broadcast::Message) -> Self {
move |message| Self {
channel: channel.clone(),
message,
}
}
+
+ fn event_id(&self) -> String {
+ self.message
+ .sent_at
+ .to_rfc3339_opts(SecondsFormat::AutoSi, /* use_z */ true)
+ }
+}
+
+impl TryFrom<ChannelEvent> for sse::Event {
+ type Error = serde_json::Error;
+
+ fn try_from(value: ChannelEvent) -> Result<Self, Self::Error> {
+ let data = serde_json::to_string_pretty(&value)?;
+ let event = Self::default().id(value.event_id()).data(&data);
+
+ Ok(event)
+ }
}