summaryrefslogtreecommitdiff
path: root/src/events/routes.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/events/routes.rs')
-rw-r--r--src/events/routes.rs124
1 files changed, 14 insertions, 110 deletions
diff --git a/src/events/routes.rs b/src/events/routes.rs
index d901f9b..3f70dcd 100644
--- a/src/events/routes.rs
+++ b/src/events/routes.rs
@@ -1,8 +1,5 @@
-use std::collections::{BTreeMap, HashSet};
-
use axum::{
extract::State,
- http::StatusCode,
response::{
sse::{self, Sse},
IntoResponse, Response,
@@ -10,87 +7,32 @@ use axum::{
routing::get,
Router,
};
-use axum_extra::extract::Query;
-use futures::{
- future,
- stream::{self, Stream, StreamExt as _, TryStreamExt as _},
-};
+use futures::stream::{Stream, StreamExt as _};
-use super::{extract::LastEventId, repo::broadcast};
-use crate::{
- app::App,
- clock::RequestedAt,
- error::Internal,
- events::app::EventsError,
- repo::{channel, login::Login},
+use super::{
+ extract::LastEventId,
+ types::{self, ResumePoint},
};
+use crate::{app::App, clock::RequestedAt, error::Internal, repo::login::Login};
#[cfg(test)]
mod test;
-// For the purposes of event replay, an "event ID" is a vector of per-channel
-// sequence numbers. Replay will start with messages whose sequence number in
-// its channel is higher than the sequence in the event ID, or if the channel
-// is not listed in the event ID, then at the beginning.
-//
-// Using a sorted map ensures that there is a canonical representation for
-// each event ID.
-type EventId = BTreeMap<channel::Id, broadcast::Sequence>;
-
pub fn router() -> Router<App> {
Router::new().route("/api/events", get(events))
}
-#[derive(Clone, serde::Deserialize)]
-struct EventsQuery {
- #[serde(default, rename = "channel")]
- channels: HashSet<channel::Id>,
-}
-
async fn events(
State(app): State<App>,
- RequestedAt(now): RequestedAt,
+ RequestedAt(subscribed_at): RequestedAt,
_: Login, // requires auth, but doesn't actually care who you are
- last_event_id: Option<LastEventId<EventId>>,
- Query(query): Query<EventsQuery>,
-) -> Result<Events<impl Stream<Item = ReplayableEvent> + std::fmt::Debug>, ErrorResponse> {
+ last_event_id: Option<LastEventId<ResumePoint>>,
+) -> Result<Events<impl Stream<Item = types::ResumableEvent> + std::fmt::Debug>, Internal> {
let resume_at = last_event_id
.map(LastEventId::into_inner)
.unwrap_or_default();
- let streams = stream::iter(query.channels)
- .then(|channel| {
- let app = app.clone();
- let resume_at = resume_at.clone();
- async move {
- let resume_at = resume_at.get(&channel).copied();
-
- let events = app
- .events()
- .subscribe(&channel, &now, resume_at)
- .await?
- .map(ChannelEvent::wrap(channel));
-
- Ok::<_, EventsError>(events)
- }
- })
- .try_collect::<Vec<_>>()
- .await
- // impl From would take more code; this is used once.
- .map_err(ErrorResponse)?;
-
- // We resume counting from the provided last-event-id mapping, rather than
- // starting from scratch, so that the events in a resumed stream contain
- // the full vector of channel IDs for their event IDs right off the bat,
- // even before any events are actually delivered.
- let stream = stream::select_all(streams).scan(resume_at, |sequences, event| {
- let (channel, sequence) = event.event_id();
- sequences.insert(channel, sequence);
-
- let event = ReplayableEvent(sequences.clone(), event);
-
- future::ready(Some(event))
- });
+ let stream = app.events().subscribe(&subscribed_at, resume_at).await?;
Ok(Events(stream))
}
@@ -100,7 +42,7 @@ struct Events<S>(S);
impl<S> IntoResponse for Events<S>
where
- S: Stream<Item = ReplayableEvent> + Send + 'static,
+ S: Stream<Item = types::ResumableEvent> + Send + 'static,
{
fn into_response(self) -> Response {
let Self(stream) = self;
@@ -111,51 +53,13 @@ where
}
}
-#[derive(Debug)]
-struct ErrorResponse(EventsError);
-
-impl IntoResponse for ErrorResponse {
- fn into_response(self) -> Response {
- let Self(error) = self;
- match error {
- not_found @ EventsError::ChannelNotFound(_) => {
- (StatusCode::NOT_FOUND, not_found.to_string()).into_response()
- }
- other => Internal::from(other).into_response(),
- }
- }
-}
-
-#[derive(Debug)]
-struct ReplayableEvent(EventId, ChannelEvent);
-
-#[derive(Debug, serde::Serialize)]
-struct ChannelEvent {
- channel: channel::Id,
- #[serde(flatten)]
- message: broadcast::Message,
-}
-
-impl ChannelEvent {
- fn wrap(channel: channel::Id) -> impl Fn(broadcast::Message) -> Self {
- move |message| Self {
- channel: channel.clone(),
- message,
- }
- }
-
- fn event_id(&self) -> (channel::Id, broadcast::Sequence) {
- (self.channel.clone(), self.message.sequence)
- }
-}
-
-impl TryFrom<ReplayableEvent> for sse::Event {
+impl TryFrom<types::ResumableEvent> for sse::Event {
type Error = serde_json::Error;
- fn try_from(value: ReplayableEvent) -> Result<Self, Self::Error> {
- let ReplayableEvent(id, data) = value;
+ fn try_from(value: types::ResumableEvent) -> Result<Self, Self::Error> {
+ let types::ResumableEvent(resume_at, data) = value;
- let id = serde_json::to_string(&id)?;
+ let id = serde_json::to_string(&resume_at)?;
let data = serde_json::to_string_pretty(&data)?;
let event = Self::default().id(id).data(data);