summaryrefslogtreecommitdiff
path: root/src/events.rs
diff options
context:
space:
mode:
authorOwen Jacobson <owen@grimoire.ca>2024-09-15 21:50:34 -0400
committerOwen Jacobson <owen@grimoire.ca>2024-09-15 22:01:20 -0400
commitae04a5605b939709552f9ecac91f00e734813980 (patch)
treef9e187bb4c5c9702ca62f4602cb21cda802546d7 /src/events.rs
parent5249aad35741f6f029c442a04d679937fb91d2bb (diff)
Consolidate channel events into a single stream endpoint.
While reviewing [MDN], I noticed this note: > SSE suffers from a limitation to the maximum number of open connections, which can be specially painful when opening various tabs as the limit is per browser and set to a very low number (6). […] This limit is per browser + domain, so that means that you can open 6 SSE connections across all of the tabs to www.example1.com and another 6 SSE connections to www.example2.com. I tested it in Safari; this is true, and once six streams are open, _no_ more requests can be made - in any tab, even a fresh one. Since the design _was_ that each channel had its own events endpoint, this is an obvious operations risk. Any client that tries to read multiple channels' streams will hit this limit quickly. This change consolidates all channel events into a single endpoint: `/events`. This takes a list of channel IDs (as query parameters, one `channel=` param per channel), and streams back events from all listed channels. The previous `/:channel/events` endpoint has been removed. Clients can selectively request events for the channels they're interested in. [MDN]: https://developer.mozilla.org/en-US/docs/Web/API/EventSource
Diffstat (limited to 'src/events.rs')
-rw-r--r--src/events.rs105
1 files changed, 105 insertions, 0 deletions
diff --git a/src/events.rs b/src/events.rs
new file mode 100644
index 0000000..2d1e1f8
--- /dev/null
+++ b/src/events.rs
@@ -0,0 +1,105 @@
+use axum::{
+ extract::State,
+ http::{HeaderName, HeaderValue},
+ response::{
+ sse::{self, Sse},
+ IntoResponse,
+ },
+ routing::get,
+ Router,
+};
+use axum_extra::{extract::Query, typed_header::TypedHeader};
+use chrono::{format::SecondsFormat, DateTime};
+use futures::{
+ future,
+ stream::{self, StreamExt as _, TryStreamExt as _},
+};
+
+use crate::{
+ app::App,
+ channel::repo::{channels::Id as ChannelId, messages::BroadcastMessage},
+ error::BoxedError,
+ error::InternalError,
+ login::repo::logins::Login,
+};
+
+pub fn router() -> Router<App> {
+ Router::new().route("/events", get(on_events))
+}
+
+#[derive(serde::Deserialize)]
+struct EventsQuery {
+ #[serde(default, rename = "channel")]
+ channels: Vec<ChannelId>,
+}
+
+async fn on_events(
+ State(app): State<App>,
+ _: Login, // requires auth, but doesn't actually care who you are
+ last_event_id: Option<TypedHeader<LastEventId>>,
+ Query(query): Query<EventsQuery>,
+) -> Result<impl IntoResponse, InternalError> {
+ let resume_at = last_event_id
+ .map(|TypedHeader(header)| header)
+ .map(|LastEventId(header)| header)
+ .map(|header| DateTime::parse_from_rfc3339(&header))
+ .transpose()?
+ .map(|ts| ts.to_utc());
+
+ let streams = stream::iter(query.channels)
+ .then(|channel| {
+ let app = app.clone();
+ async move { app.channels().events(&channel, resume_at.as_ref()).await }
+ })
+ .try_collect::<Vec<_>>()
+ .await?;
+
+ let stream = stream::select_all(streams).and_then(|msg| future::ready(to_event(msg)));
+ let sse = Sse::new(stream).keep_alive(sse::KeepAlive::default());
+
+ Ok(sse)
+}
+
+fn to_event(msg: BroadcastMessage) -> Result<sse::Event, BoxedError> {
+ let data = serde_json::to_string(&msg)?;
+ let event = sse::Event::default()
+ .id(msg
+ .sent_at
+ .to_rfc3339_opts(SecondsFormat::AutoSi, /* use_z */ true))
+ .data(&data);
+
+ Ok(event)
+}
+
+pub struct LastEventId(pub String);
+
+static LAST_EVENT_ID: HeaderName = HeaderName::from_static("last-event-id");
+
+impl headers::Header for LastEventId {
+ fn name() -> &'static HeaderName {
+ &LAST_EVENT_ID
+ }
+
+ fn decode<'i, I>(values: &mut I) -> Result<Self, headers::Error>
+ where
+ I: Iterator<Item = &'i HeaderValue>,
+ {
+ let value = values.next().ok_or_else(headers::Error::invalid)?;
+ if let Ok(value) = value.to_str() {
+ Ok(Self(value.into()))
+ } else {
+ Err(headers::Error::invalid())
+ }
+ }
+
+ fn encode<E>(&self, values: &mut E)
+ where
+ E: Extend<HeaderValue>,
+ {
+ let Self(value) = self;
+ // Must panic or suppress; the trait provides no other options.
+ let value = HeaderValue::from_str(value).expect("LastEventId is a valid header value");
+
+ values.extend(std::iter::once(value));
+ }
+}