use axum::{ extract::State, http::StatusCode, response::{ sse::{self, Sse}, IntoResponse, Response, }, routing::get, Router, }; use axum_extra::extract::Query; use chrono::{self, format::SecondsFormat}; use futures::stream::{self, Stream, StreamExt as _, TryStreamExt as _}; use super::repo::broadcast; use crate::{ app::App, channel::app::EventsError, clock::RequestedAt, error::InternalError, header::LastEventId, repo::{channel, login::Login}, }; pub fn router() -> Router { Router::new().route("/api/events", get(events)) } #[derive(serde::Deserialize)] struct EventsQuery { #[serde(default, rename = "channel")] channels: Vec, } async fn events( State(app): State, RequestedAt(now): RequestedAt, _: Login, // requires auth, but doesn't actually care who you are last_event_id: Option, Query(query): Query, ) -> Result>, ErrorResponse> { let resume_at = last_event_id.as_deref(); let streams = stream::iter(query.channels) .then(|channel| { let app = app.clone(); async move { let events = app .channels() .events(&channel, &now, resume_at) .await? .map(ChannelEvent::wrap(channel)); Ok::<_, EventsError>(events) } }) .try_collect::>() .await // impl From would take more code; this is used once. .map_err(ErrorResponse)?; let stream = stream::select_all(streams); Ok(Events(stream)) } struct Events(S); impl IntoResponse for Events where S: Stream + Send + 'static, { fn into_response(self) -> Response { let Self(stream) = self; let stream = stream.map(sse::Event::try_from); Sse::new(stream) .keep_alive(sse::KeepAlive::default()) .into_response() } } struct ErrorResponse(EventsError); impl IntoResponse for ErrorResponse { fn into_response(self) -> Response { let Self(error) = self; match error { not_found @ EventsError::ChannelNotFound(_) => { (StatusCode::NOT_FOUND, not_found.to_string()).into_response() } resume_at @ EventsError::ResumeAtError(_) => { (StatusCode::BAD_REQUEST, resume_at.to_string()).into_response() } other => InternalError::from(other).into_response(), } } } #[derive(serde::Serialize)] struct ChannelEvent { channel: channel::Id, #[serde(flatten)] message: broadcast::Message, } impl ChannelEvent { fn wrap(channel: channel::Id) -> impl Fn(broadcast::Message) -> Self { move |message| Self { channel: channel.clone(), message, } } fn event_id(&self) -> String { self.message .sent_at .to_rfc3339_opts(SecondsFormat::AutoSi, /* use_z */ true) } } impl TryFrom for sse::Event { type Error = serde_json::Error; fn try_from(value: ChannelEvent) -> Result { let data = serde_json::to_string_pretty(&value)?; let event = Self::default().id(value.event_id()).data(&data); Ok(event) } }