use axum::{ extract::State, http::{HeaderName, HeaderValue}, response::{ sse::{self, Sse}, IntoResponse, }, routing::get, Router, }; use axum_extra::{extract::Query, typed_header::TypedHeader}; use chrono::{format::SecondsFormat, DateTime}; use futures::{ future, stream::{self, StreamExt as _, TryStreamExt as _}, }; use crate::{ app::App, channel::repo::{channels::Id as ChannelId, messages::BroadcastMessage}, error::BoxedError, error::InternalError, login::repo::logins::Login, }; pub fn router() -> Router { Router::new().route("/events", get(on_events)) } #[derive(serde::Deserialize)] struct EventsQuery { #[serde(default, rename = "channel")] channels: Vec, } async fn on_events( State(app): State, _: Login, // requires auth, but doesn't actually care who you are last_event_id: Option>, Query(query): Query, ) -> Result { let resume_at = last_event_id .map(|TypedHeader(header)| header) .map(|LastEventId(header)| header) .map(|header| DateTime::parse_from_rfc3339(&header)) .transpose()? .map(|ts| ts.to_utc()); let streams = stream::iter(query.channels) .then(|channel| { let app = app.clone(); async move { app.channels().events(&channel, resume_at.as_ref()).await } }) .try_collect::>() .await?; let stream = stream::select_all(streams).and_then(|msg| future::ready(to_event(msg))); let sse = Sse::new(stream).keep_alive(sse::KeepAlive::default()); Ok(sse) } fn to_event(msg: BroadcastMessage) -> Result { let data = serde_json::to_string(&msg)?; let event = sse::Event::default() .id(msg .sent_at .to_rfc3339_opts(SecondsFormat::AutoSi, /* use_z */ true)) .data(&data); Ok(event) } pub struct LastEventId(pub String); static LAST_EVENT_ID: HeaderName = HeaderName::from_static("last-event-id"); impl headers::Header for LastEventId { fn name() -> &'static HeaderName { &LAST_EVENT_ID } fn decode<'i, I>(values: &mut I) -> Result where I: Iterator, { let value = values.next().ok_or_else(headers::Error::invalid)?; if let Ok(value) = value.to_str() { Ok(Self(value.into())) } else { Err(headers::Error::invalid()) } } fn encode(&self, values: &mut E) where E: Extend, { let Self(value) = self; // Must panic or suppress; the trait provides no other options. let value = HeaderValue::from_str(value).expect("LastEventId is a valid header value"); values.extend(std::iter::once(value)); } }