diff options
Diffstat (limited to 'src/events/routes.rs')
| -rw-r--r-- | src/events/routes.rs | 66 |
1 files changed, 50 insertions, 16 deletions
diff --git a/src/events/routes.rs b/src/events/routes.rs index a6bf5d9..7731680 100644 --- a/src/events/routes.rs +++ b/src/events/routes.rs @@ -1,3 +1,5 @@ +use std::collections::{BTreeMap, HashSet}; + use axum::{ extract::State, http::StatusCode, @@ -9,8 +11,10 @@ use axum::{ Router, }; use axum_extra::extract::Query; -use chrono::{self, format::SecondsFormat}; -use futures::stream::{self, Stream, StreamExt as _, TryStreamExt as _}; +use futures::{ + future, + stream::{self, Stream, StreamExt as _, TryStreamExt as _}, +}; use super::repo::broadcast; use crate::{ @@ -25,6 +29,15 @@ use crate::{ #[cfg(test)] mod test; +// For the purposes of event replay, an "event ID" is a vector of per-channel +// sequence numbers. Replay will start with messages whose sequence number in +// its channel is higher than the sequence in the event ID, or if the channel +// is not listed in the event ID, then at the beginning. +// +// Using a sorted map ensures that there is a canonical representation for +// each event ID. +type EventId = BTreeMap<channel::Id, broadcast::Sequence>; + pub fn router() -> Router<App> { Router::new().route("/api/events", get(events)) } @@ -32,22 +45,27 @@ pub fn router() -> Router<App> { #[derive(Clone, serde::Deserialize)] struct EventsQuery { #[serde(default, rename = "channel")] - channels: Vec<channel::Id>, + channels: HashSet<channel::Id>, } async fn events( State(app): State<App>, RequestedAt(now): RequestedAt, _: Login, // requires auth, but doesn't actually care who you are - last_event_id: Option<LastEventId>, + last_event_id: Option<LastEventId<EventId>>, Query(query): Query<EventsQuery>, -) -> Result<Events<impl Stream<Item = ChannelEvent> + std::fmt::Debug>, ErrorResponse> { - let resume_at = last_event_id.as_deref(); +) -> Result<Events<impl Stream<Item = ReplayableEvent> + std::fmt::Debug>, ErrorResponse> { + let resume_at = last_event_id + .map(LastEventId::into_inner) + .unwrap_or_default(); let streams = stream::iter(query.channels) .then(|channel| { let app = app.clone(); + let resume_at = resume_at.clone(); async move { + let resume_at = resume_at.get(&channel).copied(); + let events = app .channels() .events(&channel, &now, resume_at) @@ -62,7 +80,18 @@ async fn events( // impl From would take more code; this is used once. .map_err(ErrorResponse)?; - let stream = stream::select_all(streams); + // We resume counting from the provided last-event-id mapping, rather than + // starting from scratch, so that the events in a resumed stream contain + // the full vector of channel IDs for their event IDs right off the bat, + // even before any events are actually delivered. + let stream = stream::select_all(streams).scan(resume_at, |sequences, event| { + let (channel, sequence) = event.event_id(); + sequences.insert(channel, sequence); + + let event = ReplayableEvent(sequences.clone(), event); + + future::ready(Some(event)) + }); Ok(Events(stream)) } @@ -72,7 +101,7 @@ struct Events<S>(S); impl<S> IntoResponse for Events<S> where - S: Stream<Item = ChannelEvent> + Send + 'static, + S: Stream<Item = ReplayableEvent> + Send + 'static, { fn into_response(self) -> Response { let Self(stream) = self; @@ -101,6 +130,9 @@ impl IntoResponse for ErrorResponse { } } +#[derive(Debug)] +struct ReplayableEvent(EventId, ChannelEvent); + #[derive(Debug, serde::Serialize)] struct ChannelEvent { channel: channel::Id, @@ -116,19 +148,21 @@ impl ChannelEvent { } } - fn event_id(&self) -> String { - self.message - .sent_at - .to_rfc3339_opts(SecondsFormat::AutoSi, /* use_z */ true) + fn event_id(&self) -> (channel::Id, broadcast::Sequence) { + (self.channel.clone(), self.message.sequence) } } -impl TryFrom<ChannelEvent> for sse::Event { +impl TryFrom<ReplayableEvent> for sse::Event { type Error = serde_json::Error; - fn try_from(value: ChannelEvent) -> Result<Self, Self::Error> { - let data = serde_json::to_string_pretty(&value)?; - let event = Self::default().id(value.event_id()).data(&data); + fn try_from(value: ReplayableEvent) -> Result<Self, Self::Error> { + let ReplayableEvent(id, data) = value; + + let id = serde_json::to_string(&id)?; + let data = serde_json::to_string_pretty(&data)?; + + let event = Self::default().id(id).data(data); Ok(event) } |
