diff options
Diffstat (limited to 'src/events.rs')
| -rw-r--r-- | src/events.rs | 105 |
1 files changed, 105 insertions, 0 deletions
diff --git a/src/events.rs b/src/events.rs new file mode 100644 index 0000000..2d1e1f8 --- /dev/null +++ b/src/events.rs @@ -0,0 +1,105 @@ +use axum::{ + extract::State, + http::{HeaderName, HeaderValue}, + response::{ + sse::{self, Sse}, + IntoResponse, + }, + routing::get, + Router, +}; +use axum_extra::{extract::Query, typed_header::TypedHeader}; +use chrono::{format::SecondsFormat, DateTime}; +use futures::{ + future, + stream::{self, StreamExt as _, TryStreamExt as _}, +}; + +use crate::{ + app::App, + channel::repo::{channels::Id as ChannelId, messages::BroadcastMessage}, + error::BoxedError, + error::InternalError, + login::repo::logins::Login, +}; + +pub fn router() -> Router<App> { + Router::new().route("/events", get(on_events)) +} + +#[derive(serde::Deserialize)] +struct EventsQuery { + #[serde(default, rename = "channel")] + channels: Vec<ChannelId>, +} + +async fn on_events( + State(app): State<App>, + _: Login, // requires auth, but doesn't actually care who you are + last_event_id: Option<TypedHeader<LastEventId>>, + Query(query): Query<EventsQuery>, +) -> Result<impl IntoResponse, InternalError> { + let resume_at = last_event_id + .map(|TypedHeader(header)| header) + .map(|LastEventId(header)| header) + .map(|header| DateTime::parse_from_rfc3339(&header)) + .transpose()? + .map(|ts| ts.to_utc()); + + let streams = stream::iter(query.channels) + .then(|channel| { + let app = app.clone(); + async move { app.channels().events(&channel, resume_at.as_ref()).await } + }) + .try_collect::<Vec<_>>() + .await?; + + let stream = stream::select_all(streams).and_then(|msg| future::ready(to_event(msg))); + let sse = Sse::new(stream).keep_alive(sse::KeepAlive::default()); + + Ok(sse) +} + +fn to_event(msg: BroadcastMessage) -> Result<sse::Event, BoxedError> { + let data = serde_json::to_string(&msg)?; + let event = sse::Event::default() + .id(msg + .sent_at + .to_rfc3339_opts(SecondsFormat::AutoSi, /* use_z */ true)) + .data(&data); + + Ok(event) +} + +pub struct LastEventId(pub String); + +static LAST_EVENT_ID: HeaderName = HeaderName::from_static("last-event-id"); + +impl headers::Header for LastEventId { + fn name() -> &'static HeaderName { + &LAST_EVENT_ID + } + + fn decode<'i, I>(values: &mut I) -> Result<Self, headers::Error> + where + I: Iterator<Item = &'i HeaderValue>, + { + let value = values.next().ok_or_else(headers::Error::invalid)?; + if let Ok(value) = value.to_str() { + Ok(Self(value.into())) + } else { + Err(headers::Error::invalid()) + } + } + + fn encode<E>(&self, values: &mut E) + where + E: Extend<HeaderValue>, + { + let Self(value) = self; + // Must panic or suppress; the trait provides no other options. + let value = HeaderValue::from_str(value).expect("LastEventId is a valid header value"); + + values.extend(std::iter::once(value)); + } +} |
