From ae04a5605b939709552f9ecac91f00e734813980 Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Sun, 15 Sep 2024 21:50:34 -0400 Subject: Consolidate channel events into a single stream endpoint. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit While reviewing [MDN], I noticed this note: > SSE suffers from a limitation to the maximum number of open connections, which can be specially painful when opening various tabs as the limit is per browser and set to a very low number (6). […] This limit is per browser + domain, so that means that you can open 6 SSE connections across all of the tabs to www.example1.com and another 6 SSE connections to www.example2.com. I tested it in Safari; this is true, and once six streams are open, _no_ more requests can be made - in any tab, even a fresh one. Since the design _was_ that each channel had its own events endpoint, this is an obvious operations risk. Any client that tries to read multiple channels' streams will hit this limit quickly. This change consolidates all channel events into a single endpoint: `/events`. This takes a list of channel IDs (as query parameters, one `channel=` param per channel), and streams back events from all listed channels. The previous `/:channel/events` endpoint has been removed. Clients can selectively request events for the channels they're interested in. [MDN]: https://developer.mozilla.org/en-US/docs/Web/API/EventSource --- src/events.rs | 105 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 105 insertions(+) create mode 100644 src/events.rs (limited to 'src/events.rs') diff --git a/src/events.rs b/src/events.rs new file mode 100644 index 0000000..2d1e1f8 --- /dev/null +++ b/src/events.rs @@ -0,0 +1,105 @@ +use axum::{ + extract::State, + http::{HeaderName, HeaderValue}, + response::{ + sse::{self, Sse}, + IntoResponse, + }, + routing::get, + Router, +}; +use axum_extra::{extract::Query, typed_header::TypedHeader}; +use chrono::{format::SecondsFormat, DateTime}; +use futures::{ + future, + stream::{self, StreamExt as _, TryStreamExt as _}, +}; + +use crate::{ + app::App, + channel::repo::{channels::Id as ChannelId, messages::BroadcastMessage}, + error::BoxedError, + error::InternalError, + login::repo::logins::Login, +}; + +pub fn router() -> Router { + Router::new().route("/events", get(on_events)) +} + +#[derive(serde::Deserialize)] +struct EventsQuery { + #[serde(default, rename = "channel")] + channels: Vec, +} + +async fn on_events( + State(app): State, + _: Login, // requires auth, but doesn't actually care who you are + last_event_id: Option>, + Query(query): Query, +) -> Result { + let resume_at = last_event_id + .map(|TypedHeader(header)| header) + .map(|LastEventId(header)| header) + .map(|header| DateTime::parse_from_rfc3339(&header)) + .transpose()? + .map(|ts| ts.to_utc()); + + let streams = stream::iter(query.channels) + .then(|channel| { + let app = app.clone(); + async move { app.channels().events(&channel, resume_at.as_ref()).await } + }) + .try_collect::>() + .await?; + + let stream = stream::select_all(streams).and_then(|msg| future::ready(to_event(msg))); + let sse = Sse::new(stream).keep_alive(sse::KeepAlive::default()); + + Ok(sse) +} + +fn to_event(msg: BroadcastMessage) -> Result { + let data = serde_json::to_string(&msg)?; + let event = sse::Event::default() + .id(msg + .sent_at + .to_rfc3339_opts(SecondsFormat::AutoSi, /* use_z */ true)) + .data(&data); + + Ok(event) +} + +pub struct LastEventId(pub String); + +static LAST_EVENT_ID: HeaderName = HeaderName::from_static("last-event-id"); + +impl headers::Header for LastEventId { + fn name() -> &'static HeaderName { + &LAST_EVENT_ID + } + + fn decode<'i, I>(values: &mut I) -> Result + where + I: Iterator, + { + let value = values.next().ok_or_else(headers::Error::invalid)?; + if let Ok(value) = value.to_str() { + Ok(Self(value.into())) + } else { + Err(headers::Error::invalid()) + } + } + + fn encode(&self, values: &mut E) + where + E: Extend, + { + let Self(value) = self; + // Must panic or suppress; the trait provides no other options. + let value = HeaderValue::from_str(value).expect("LastEventId is a valid header value"); + + values.extend(std::iter::once(value)); + } +} -- cgit v1.2.3