diff options
Diffstat (limited to 'src/events/routes.rs')
| -rw-r--r-- | src/events/routes.rs | 124 |
1 files changed, 14 insertions, 110 deletions
diff --git a/src/events/routes.rs b/src/events/routes.rs index d901f9b..3f70dcd 100644 --- a/src/events/routes.rs +++ b/src/events/routes.rs @@ -1,8 +1,5 @@ -use std::collections::{BTreeMap, HashSet}; - use axum::{ extract::State, - http::StatusCode, response::{ sse::{self, Sse}, IntoResponse, Response, @@ -10,87 +7,32 @@ use axum::{ routing::get, Router, }; -use axum_extra::extract::Query; -use futures::{ - future, - stream::{self, Stream, StreamExt as _, TryStreamExt as _}, -}; +use futures::stream::{Stream, StreamExt as _}; -use super::{extract::LastEventId, repo::broadcast}; -use crate::{ - app::App, - clock::RequestedAt, - error::Internal, - events::app::EventsError, - repo::{channel, login::Login}, +use super::{ + extract::LastEventId, + types::{self, ResumePoint}, }; +use crate::{app::App, clock::RequestedAt, error::Internal, repo::login::Login}; #[cfg(test)] mod test; -// For the purposes of event replay, an "event ID" is a vector of per-channel -// sequence numbers. Replay will start with messages whose sequence number in -// its channel is higher than the sequence in the event ID, or if the channel -// is not listed in the event ID, then at the beginning. -// -// Using a sorted map ensures that there is a canonical representation for -// each event ID. -type EventId = BTreeMap<channel::Id, broadcast::Sequence>; - pub fn router() -> Router<App> { Router::new().route("/api/events", get(events)) } -#[derive(Clone, serde::Deserialize)] -struct EventsQuery { - #[serde(default, rename = "channel")] - channels: HashSet<channel::Id>, -} - async fn events( State(app): State<App>, - RequestedAt(now): RequestedAt, + RequestedAt(subscribed_at): RequestedAt, _: Login, // requires auth, but doesn't actually care who you are - last_event_id: Option<LastEventId<EventId>>, - Query(query): Query<EventsQuery>, -) -> Result<Events<impl Stream<Item = ReplayableEvent> + std::fmt::Debug>, ErrorResponse> { + last_event_id: Option<LastEventId<ResumePoint>>, +) -> Result<Events<impl Stream<Item = types::ResumableEvent> + std::fmt::Debug>, Internal> { let resume_at = last_event_id .map(LastEventId::into_inner) .unwrap_or_default(); - let streams = stream::iter(query.channels) - .then(|channel| { - let app = app.clone(); - let resume_at = resume_at.clone(); - async move { - let resume_at = resume_at.get(&channel).copied(); - - let events = app - .events() - .subscribe(&channel, &now, resume_at) - .await? - .map(ChannelEvent::wrap(channel)); - - Ok::<_, EventsError>(events) - } - }) - .try_collect::<Vec<_>>() - .await - // impl From would take more code; this is used once. - .map_err(ErrorResponse)?; - - // We resume counting from the provided last-event-id mapping, rather than - // starting from scratch, so that the events in a resumed stream contain - // the full vector of channel IDs for their event IDs right off the bat, - // even before any events are actually delivered. - let stream = stream::select_all(streams).scan(resume_at, |sequences, event| { - let (channel, sequence) = event.event_id(); - sequences.insert(channel, sequence); - - let event = ReplayableEvent(sequences.clone(), event); - - future::ready(Some(event)) - }); + let stream = app.events().subscribe(&subscribed_at, resume_at).await?; Ok(Events(stream)) } @@ -100,7 +42,7 @@ struct Events<S>(S); impl<S> IntoResponse for Events<S> where - S: Stream<Item = ReplayableEvent> + Send + 'static, + S: Stream<Item = types::ResumableEvent> + Send + 'static, { fn into_response(self) -> Response { let Self(stream) = self; @@ -111,51 +53,13 @@ where } } -#[derive(Debug)] -struct ErrorResponse(EventsError); - -impl IntoResponse for ErrorResponse { - fn into_response(self) -> Response { - let Self(error) = self; - match error { - not_found @ EventsError::ChannelNotFound(_) => { - (StatusCode::NOT_FOUND, not_found.to_string()).into_response() - } - other => Internal::from(other).into_response(), - } - } -} - -#[derive(Debug)] -struct ReplayableEvent(EventId, ChannelEvent); - -#[derive(Debug, serde::Serialize)] -struct ChannelEvent { - channel: channel::Id, - #[serde(flatten)] - message: broadcast::Message, -} - -impl ChannelEvent { - fn wrap(channel: channel::Id) -> impl Fn(broadcast::Message) -> Self { - move |message| Self { - channel: channel.clone(), - message, - } - } - - fn event_id(&self) -> (channel::Id, broadcast::Sequence) { - (self.channel.clone(), self.message.sequence) - } -} - -impl TryFrom<ReplayableEvent> for sse::Event { +impl TryFrom<types::ResumableEvent> for sse::Event { type Error = serde_json::Error; - fn try_from(value: ReplayableEvent) -> Result<Self, Self::Error> { - let ReplayableEvent(id, data) = value; + fn try_from(value: types::ResumableEvent) -> Result<Self, Self::Error> { + let types::ResumableEvent(resume_at, data) = value; - let id = serde_json::to_string(&id)?; + let id = serde_json::to_string(&resume_at)?; let data = serde_json::to_string_pretty(&data)?; let event = Self::default().id(id).data(data); |
