summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorOwen Jacobson <owen@grimoire.ca>2024-09-20 23:27:59 -0400
committerOwen Jacobson <owen@grimoire.ca>2024-09-20 23:27:59 -0400
commita4dcc4b5c53966f3c4366e414a3e39d094f21404 (patch)
treecc6b3ab3876bb05a81d03569aa6d9a42cdc3c6d1 /src
parent0a05491930fb34ce7c93c33ea0b7599360483fc7 (diff)
Push the handling of the `Last-Event-Id` _format_ inside of `channels::app`.
This is intended to make it a bit more opaque to callers, and to free me up to experiment with the event ID format. It also makes event IDs tractable for testing.
Diffstat (limited to 'src')
-rw-r--r--src/channel/app.rs27
-rw-r--r--src/events/app.rs4
-rw-r--r--src/events/routes.rs77
-rw-r--r--src/header.rs15
4 files changed, 71 insertions, 52 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs
index f9a75d7..48e3e3c 100644
--- a/src/channel/app.rs
+++ b/src/channel/app.rs
@@ -56,7 +56,7 @@ impl<'a> Channels<'a> {
channel: &channel::Id,
body: &str,
sent_at: &DateTime,
- ) -> Result<(), EventsError> {
+ ) -> Result<broadcast::Message, EventsError> {
let mut tx = self.db.begin().await?;
let channel = tx
.channels()
@@ -69,34 +69,39 @@ impl<'a> Channels<'a> {
.await?;
tx.commit().await?;
- self.broadcaster.broadcast(&channel.id, message);
- Ok(())
+ self.broadcaster.broadcast(&channel.id, &message);
+ Ok(message)
}
pub async fn events(
&self,
channel: &channel::Id,
subscribed_at: &DateTime,
- resume_at: Option<&DateTime>,
+ resume_at: Option<&str>,
) -> Result<impl Stream<Item = broadcast::Message>, EventsError> {
// Somewhat arbitrarily, expire after 90 days.
let expire_at = subscribed_at.to_owned() - TimeDelta::days(90);
- let mut tx = self
- .db
- .begin()
+ let resume_at = resume_at
+ .map(chrono::DateTime::parse_from_rfc3339)
+ .transpose()?
+ .map(|resume_at| resume_at.to_utc());
+
+ let mut tx = self.db.begin().await?;
+ let channel = tx
+ .channels()
+ .by_id(channel)
.await
.not_found(|| EventsError::ChannelNotFound(channel.clone()))?;
- let channel = tx.channels().by_id(channel).await?;
let live_messages = self
.broadcaster
.listen(&channel.id)
- .filter(Self::skip_stale(resume_at))
+ .filter(Self::skip_stale(resume_at.as_ref()))
.filter(Self::skip_expired(&expire_at));
tx.broadcast().expire(&expire_at).await?;
- let stored_messages = tx.broadcast().replay(&channel, resume_at).await?;
+ let stored_messages = tx.broadcast().replay(&channel, resume_at.as_ref()).await?;
tx.commit().await?;
let stored_messages = stream::iter(stored_messages);
@@ -154,5 +159,7 @@ pub enum EventsError {
#[error("channel {0} not found")]
ChannelNotFound(channel::Id),
#[error(transparent)]
+ ResumeAtError(#[from] chrono::ParseError),
+ #[error(transparent)]
DatabaseError(#[from] sqlx::Error),
}
diff --git a/src/events/app.rs b/src/events/app.rs
index dfb23d7..c3a027d 100644
--- a/src/events/app.rs
+++ b/src/events/app.rs
@@ -54,7 +54,7 @@ impl Broadcaster {
// panic: if ``channel`` has not been previously registered, and was not
// part of the initial set of channels.
- pub fn broadcast(&self, channel: &channel::Id, message: broadcast::Message) {
+ pub fn broadcast(&self, channel: &channel::Id, message: &broadcast::Message) {
let tx = self.sender(channel);
// Per the Tokio docs, the returned error is only used to indicate that
@@ -64,7 +64,7 @@ impl Broadcaster {
//
// The successful return value, which includes the number of active
// receivers, also isn't that interesting to us.
- let _ = tx.send(message);
+ let _ = tx.send(message.clone());
}
// panic: if ``channel`` has not been previously registered, and was not
diff --git a/src/events/routes.rs b/src/events/routes.rs
index f880c70..ce5b778 100644
--- a/src/events/routes.rs
+++ b/src/events/routes.rs
@@ -9,7 +9,7 @@ use axum::{
Router,
};
use axum_extra::extract::Query;
-use chrono::{self, format::SecondsFormat, DateTime};
+use chrono::{self, format::SecondsFormat};
use futures::stream::{self, Stream, StreamExt as _, TryStreamExt as _};
use super::repo::broadcast;
@@ -23,7 +23,7 @@ use crate::{
};
pub fn router() -> Router<App> {
- Router::new().route("/api/events", get(on_events))
+ Router::new().route("/api/events", get(events))
}
#[derive(serde::Deserialize)]
@@ -32,20 +32,14 @@ struct EventsQuery {
channels: Vec<channel::Id>,
}
-async fn on_events(
+async fn events(
State(app): State<App>,
RequestedAt(now): RequestedAt,
_: Login, // requires auth, but doesn't actually care who you are
last_event_id: Option<LastEventId>,
Query(query): Query<EventsQuery>,
-) -> Result<Events<impl Stream<Item = ChannelEvent<broadcast::Message>>>, ErrorResponse> {
- let resume_at = last_event_id
- .map(|LastEventId(header)| header)
- .map(|header| DateTime::parse_from_rfc3339(&header))
- .transpose()
- // impl From would take more code; this is used once.
- .map_err(ErrorResponse::LastEventIdError)?
- .map(|ts| ts.to_utc());
+) -> Result<Events<impl Stream<Item = ChannelEvent>>, ErrorResponse> {
+ let resume_at = last_event_id.as_deref();
let streams = stream::iter(query.channels)
.then(|channel| {
@@ -53,7 +47,7 @@ async fn on_events(
async move {
let events = app
.channels()
- .events(&channel, &now, resume_at.as_ref())
+ .events(&channel, &now, resume_at)
.await?
.map(ChannelEvent::wrap(channel));
@@ -63,7 +57,7 @@ async fn on_events(
.try_collect::<Vec<_>>()
.await
// impl From would take more code; this is used once.
- .map_err(ErrorResponse::EventsError)?;
+ .map_err(ErrorResponse)?;
let stream = stream::select_all(streams);
@@ -74,60 +68,63 @@ struct Events<S>(S);
impl<S> IntoResponse for Events<S>
where
- S: Stream<Item = ChannelEvent<broadcast::Message>> + Send + 'static,
+ S: Stream<Item = ChannelEvent> + Send + 'static,
{
fn into_response(self) -> Response {
let Self(stream) = self;
- let stream = stream.map(to_sse_event);
+ let stream = stream.map(sse::Event::try_from);
Sse::new(stream)
.keep_alive(sse::KeepAlive::default())
.into_response()
}
}
-enum ErrorResponse {
- EventsError(EventsError),
- LastEventIdError(chrono::ParseError),
-}
+struct ErrorResponse(EventsError);
impl IntoResponse for ErrorResponse {
fn into_response(self) -> Response {
- match self {
- Self::EventsError(not_found @ EventsError::ChannelNotFound(_)) => {
+ let Self(error) = self;
+ match error {
+ not_found @ EventsError::ChannelNotFound(_) => {
(StatusCode::NOT_FOUND, not_found.to_string()).into_response()
}
- Self::EventsError(other) => InternalError::from(other).into_response(),
- Self::LastEventIdError(other) => {
- (StatusCode::BAD_REQUEST, other.to_string()).into_response()
+ resume_at @ EventsError::ResumeAtError(_) => {
+ (StatusCode::BAD_REQUEST, resume_at.to_string()).into_response()
}
+ other => InternalError::from(other).into_response(),
}
}
}
-fn to_sse_event(event: ChannelEvent<broadcast::Message>) -> Result<sse::Event, serde_json::Error> {
- let data = serde_json::to_string_pretty(&event)?;
- let event = sse::Event::default()
- .id(event
- .message
- .sent_at
- .to_rfc3339_opts(SecondsFormat::AutoSi, /* use_z */ true))
- .data(&data);
-
- Ok(event)
-}
-
#[derive(serde::Serialize)]
-struct ChannelEvent<M> {
+struct ChannelEvent {
channel: channel::Id,
#[serde(flatten)]
- message: M,
+ message: broadcast::Message,
}
-impl<M> ChannelEvent<M> {
- fn wrap(channel: channel::Id) -> impl Fn(M) -> Self {
+impl ChannelEvent {
+ fn wrap(channel: channel::Id) -> impl Fn(broadcast::Message) -> Self {
move |message| Self {
channel: channel.clone(),
message,
}
}
+
+ fn event_id(&self) -> String {
+ self.message
+ .sent_at
+ .to_rfc3339_opts(SecondsFormat::AutoSi, /* use_z */ true)
+ }
+}
+
+impl TryFrom<ChannelEvent> for sse::Event {
+ type Error = serde_json::Error;
+
+ fn try_from(value: ChannelEvent) -> Result<Self, Self::Error> {
+ let data = serde_json::to_string_pretty(&value)?;
+ let event = Self::default().id(value.event_id()).data(&data);
+
+ Ok(event)
+ }
}
diff --git a/src/header.rs b/src/header.rs
index 904e29d..61cc561 100644
--- a/src/header.rs
+++ b/src/header.rs
@@ -56,3 +56,18 @@ where
Ok(requested_at)
}
}
+
+impl From<String> for LastEventId {
+ fn from(header: String) -> Self {
+ Self(header)
+ }
+}
+
+impl std::ops::Deref for LastEventId {
+ type Target = str;
+
+ fn deref(&self) -> &Self::Target {
+ let Self(header) = self;
+ header
+ }
+}