summaryrefslogtreecommitdiff
path: root/src/events/routes.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/events/routes.rs')
-rw-r--r--src/events/routes.rs66
1 files changed, 50 insertions, 16 deletions
diff --git a/src/events/routes.rs b/src/events/routes.rs
index a6bf5d9..7731680 100644
--- a/src/events/routes.rs
+++ b/src/events/routes.rs
@@ -1,3 +1,5 @@
+use std::collections::{BTreeMap, HashSet};
+
use axum::{
extract::State,
http::StatusCode,
@@ -9,8 +11,10 @@ use axum::{
Router,
};
use axum_extra::extract::Query;
-use chrono::{self, format::SecondsFormat};
-use futures::stream::{self, Stream, StreamExt as _, TryStreamExt as _};
+use futures::{
+ future,
+ stream::{self, Stream, StreamExt as _, TryStreamExt as _},
+};
use super::repo::broadcast;
use crate::{
@@ -25,6 +29,15 @@ use crate::{
#[cfg(test)]
mod test;
+// For the purposes of event replay, an "event ID" is a vector of per-channel
+// sequence numbers. Replay will start with messages whose sequence number in
+// its channel is higher than the sequence in the event ID, or if the channel
+// is not listed in the event ID, then at the beginning.
+//
+// Using a sorted map ensures that there is a canonical representation for
+// each event ID.
+type EventId = BTreeMap<channel::Id, broadcast::Sequence>;
+
pub fn router() -> Router<App> {
Router::new().route("/api/events", get(events))
}
@@ -32,22 +45,27 @@ pub fn router() -> Router<App> {
#[derive(Clone, serde::Deserialize)]
struct EventsQuery {
#[serde(default, rename = "channel")]
- channels: Vec<channel::Id>,
+ channels: HashSet<channel::Id>,
}
async fn events(
State(app): State<App>,
RequestedAt(now): RequestedAt,
_: Login, // requires auth, but doesn't actually care who you are
- last_event_id: Option<LastEventId>,
+ last_event_id: Option<LastEventId<EventId>>,
Query(query): Query<EventsQuery>,
-) -> Result<Events<impl Stream<Item = ChannelEvent> + std::fmt::Debug>, ErrorResponse> {
- let resume_at = last_event_id.as_deref();
+) -> Result<Events<impl Stream<Item = ReplayableEvent> + std::fmt::Debug>, ErrorResponse> {
+ let resume_at = last_event_id
+ .map(LastEventId::into_inner)
+ .unwrap_or_default();
let streams = stream::iter(query.channels)
.then(|channel| {
let app = app.clone();
+ let resume_at = resume_at.clone();
async move {
+ let resume_at = resume_at.get(&channel).copied();
+
let events = app
.channels()
.events(&channel, &now, resume_at)
@@ -62,7 +80,18 @@ async fn events(
// impl From would take more code; this is used once.
.map_err(ErrorResponse)?;
- let stream = stream::select_all(streams);
+ // We resume counting from the provided last-event-id mapping, rather than
+ // starting from scratch, so that the events in a resumed stream contain
+ // the full vector of channel IDs for their event IDs right off the bat,
+ // even before any events are actually delivered.
+ let stream = stream::select_all(streams).scan(resume_at, |sequences, event| {
+ let (channel, sequence) = event.event_id();
+ sequences.insert(channel, sequence);
+
+ let event = ReplayableEvent(sequences.clone(), event);
+
+ future::ready(Some(event))
+ });
Ok(Events(stream))
}
@@ -72,7 +101,7 @@ struct Events<S>(S);
impl<S> IntoResponse for Events<S>
where
- S: Stream<Item = ChannelEvent> + Send + 'static,
+ S: Stream<Item = ReplayableEvent> + Send + 'static,
{
fn into_response(self) -> Response {
let Self(stream) = self;
@@ -101,6 +130,9 @@ impl IntoResponse for ErrorResponse {
}
}
+#[derive(Debug)]
+struct ReplayableEvent(EventId, ChannelEvent);
+
#[derive(Debug, serde::Serialize)]
struct ChannelEvent {
channel: channel::Id,
@@ -116,19 +148,21 @@ impl ChannelEvent {
}
}
- fn event_id(&self) -> String {
- self.message
- .sent_at
- .to_rfc3339_opts(SecondsFormat::AutoSi, /* use_z */ true)
+ fn event_id(&self) -> (channel::Id, broadcast::Sequence) {
+ (self.channel.clone(), self.message.sequence)
}
}
-impl TryFrom<ChannelEvent> for sse::Event {
+impl TryFrom<ReplayableEvent> for sse::Event {
type Error = serde_json::Error;
- fn try_from(value: ChannelEvent) -> Result<Self, Self::Error> {
- let data = serde_json::to_string_pretty(&value)?;
- let event = Self::default().id(value.event_id()).data(&data);
+ fn try_from(value: ReplayableEvent) -> Result<Self, Self::Error> {
+ let ReplayableEvent(id, data) = value;
+
+ let id = serde_json::to_string(&id)?;
+ let data = serde_json::to_string_pretty(&data)?;
+
+ let event = Self::default().id(id).data(data);
Ok(event)
}