diff options
| author | Owen Jacobson <owen@grimoire.ca> | 2024-09-15 21:50:34 -0400 |
|---|---|---|
| committer | Owen Jacobson <owen@grimoire.ca> | 2024-09-15 22:01:20 -0400 |
| commit | ae04a5605b939709552f9ecac91f00e734813980 (patch) | |
| tree | f9e187bb4c5c9702ca62f4602cb21cda802546d7 /src | |
| parent | 5249aad35741f6f029c442a04d679937fb91d2bb (diff) | |
Consolidate channel events into a single stream endpoint.
While reviewing [MDN], I noticed this note:
> SSE suffers from a limitation to the maximum number of open connections, which can be specially painful when opening various tabs as the limit is per browser and set to a very low number (6). […] This limit is per browser + domain, so that means that you can open 6 SSE connections across all of the tabs to www.example1.com and another 6 SSE connections to www.example2.com.
I tested it in Safari; this is true, and once six streams are open, _no_ more requests can be made - in any tab, even a fresh one.
Since the design _was_ that each channel had its own events endpoint, this is an obvious operations risk. Any client that tries to read multiple channels' streams will hit this limit quickly.
This change consolidates all channel events into a single endpoint: `/events`. This takes a list of channel IDs (as query parameters, one `channel=` param per channel), and streams back events from all listed channels. The previous `/:channel/events` endpoint has been removed. Clients can selectively request events for the channels they're interested in.
[MDN]: https://developer.mozilla.org/en-US/docs/Web/API/EventSource
Diffstat (limited to 'src')
| -rw-r--r-- | src/channel/app.rs | 5 | ||||
| -rw-r--r-- | src/channel/header.rs | 34 | ||||
| -rw-r--r-- | src/channel/mod.rs | 1 | ||||
| -rw-r--r-- | src/channel/repo/messages.rs | 19 | ||||
| -rw-r--r-- | src/channel/routes.rs | 54 | ||||
| -rw-r--r-- | src/cli.rs | 4 | ||||
| -rw-r--r-- | src/events.rs | 105 | ||||
| -rw-r--r-- | src/index/templates.rs | 3 | ||||
| -rw-r--r-- | src/lib.rs | 1 |
9 files changed, 131 insertions, 95 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs index c0a6d60..5417a5e 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -4,7 +4,7 @@ use std::sync::{Arc, Mutex, MutexGuard}; use futures::{ future, stream::{self, StreamExt as _, TryStreamExt as _}, - TryStream, + Stream, }; use sqlx::sqlite::SqlitePool; use tokio::sync::broadcast::{channel, Sender}; @@ -57,7 +57,8 @@ impl<'a> Channels<'a> { &self, channel: &ChannelId, resume_at: Option<&DateTime>, - ) -> Result<impl TryStream<Ok = BroadcastMessage, Error = BoxedError>, BoxedError> { + ) -> Result<impl Stream<Item = Result<BroadcastMessage, BoxedError>> + 'static, BoxedError> + { fn skip_stale<E>( resume_at: Option<&DateTime>, ) -> impl for<'m> FnMut(&'m BroadcastMessage) -> future::Ready<Result<bool, E>> { diff --git a/src/channel/header.rs b/src/channel/header.rs deleted file mode 100644 index eda8214..0000000 --- a/src/channel/header.rs +++ /dev/null @@ -1,34 +0,0 @@ -use axum::http::{HeaderName, HeaderValue}; - -pub struct LastEventId(pub String); - -static LAST_EVENT_ID: HeaderName = HeaderName::from_static("last-event-id"); - -impl headers::Header for LastEventId { - fn name() -> &'static HeaderName { - &LAST_EVENT_ID - } - - fn decode<'i, I>(values: &mut I) -> Result<Self, headers::Error> - where - I: Iterator<Item = &'i HeaderValue>, - { - let value = values.next().ok_or_else(headers::Error::invalid)?; - if let Ok(value) = value.to_str() { - Ok(Self(value.into())) - } else { - Err(headers::Error::invalid()) - } - } - - fn encode<E>(&self, values: &mut E) - where - E: Extend<HeaderValue>, - { - let Self(value) = self; - // Must panic or suppress; the trait provides no other options. - let value = HeaderValue::from_str(value).expect("LastEventId is a valid header value"); - - values.extend(std::iter::once(value)); - } -} diff --git a/src/channel/mod.rs b/src/channel/mod.rs index bc2cc6c..f67ea04 100644 --- a/src/channel/mod.rs +++ b/src/channel/mod.rs @@ -1,5 +1,4 @@ pub mod app; -mod header; pub mod repo; mod routes; diff --git a/src/channel/repo/messages.rs b/src/channel/repo/messages.rs index b465f61..e15a02a 100644 --- a/src/channel/repo/messages.rs +++ b/src/channel/repo/messages.rs @@ -26,6 +26,7 @@ pub struct Messages<'t>(&'t mut SqliteConnection); pub struct BroadcastMessage { pub id: Id, pub sender: Login, + pub channel: ChannelId, pub body: String, pub sent_at: DateTime, } @@ -49,6 +50,8 @@ impl<'c> Messages<'c> { values ($1, $2, $3, $4, $5) returning id as "id: Id", + sender as "sender: LoginId", + channel as "channel: ChannelId", body, sent_at as "sent_at: DateTime" "#, @@ -58,11 +61,15 @@ impl<'c> Messages<'c> { body, sent_at, ) - .map(|row| BroadcastMessage { - sender: sender.clone(), - id: row.id, - body: row.body, - sent_at: row.sent_at, + .map(|row| { + debug_assert!(row.sender == sender.id); + BroadcastMessage { + id: row.id, + sender: sender.clone(), + channel: row.channel, + body: row.body, + sent_at: row.sent_at, + } }) .fetch_one(&mut *self.0) .await?; @@ -81,6 +88,7 @@ impl<'c> Messages<'c> { message.id as "id: Id", login.id as "sender_id: LoginId", login.name as sender_name, + message.channel as "channel: ChannelId", message.body, message.sent_at as "sent_at: DateTime" from message @@ -98,6 +106,7 @@ impl<'c> Messages<'c> { id: row.sender_id, name: row.sender_name, }, + channel: row.channel, body: row.body, sent_at: row.sent_at, }) diff --git a/src/channel/routes.rs b/src/channel/routes.rs index eae68a2..3c2353b 100644 --- a/src/channel/routes.rs +++ b/src/channel/routes.rs @@ -1,30 +1,17 @@ use axum::{ extract::{Form, Path, State}, - response::{ - sse::{self, Sse}, - IntoResponse, Redirect, - }, - routing::{get, post}, + response::{IntoResponse, Redirect}, + routing::post, Router, }; -use axum_extra::TypedHeader; -use chrono::{format::SecondsFormat, DateTime}; -use futures::{future, stream::TryStreamExt as _}; -use super::{ - header::LastEventId, - repo::{channels::Id as ChannelId, messages::BroadcastMessage}, -}; -use crate::{ - app::App, clock::RequestedAt, error::BoxedError, error::InternalError, - login::repo::logins::Login, -}; +use super::repo::channels::Id as ChannelId; +use crate::{app::App, clock::RequestedAt, error::InternalError, login::repo::logins::Login}; pub fn router() -> Router<App> { Router::new() .route("/create", post(on_create)) .route("/:channel/send", post(on_send)) - .route("/:channel/events", get(on_events)) } #[derive(serde::Deserialize)] @@ -60,36 +47,3 @@ async fn on_send( Ok(Redirect::to(&format!("/{}", channel))) } - -async fn on_events( - Path(channel): Path<ChannelId>, - State(app): State<App>, - _: Login, // requires auth, but doesn't actually care who you are - last_event_id: Option<TypedHeader<LastEventId>>, -) -> Result<impl IntoResponse, InternalError> { - let resume_at = last_event_id - .map(|TypedHeader(header)| header) - .map(|LastEventId(header)| header) - .map(|header| DateTime::parse_from_rfc3339(&header)) - .transpose()? - .map(|ts| ts.to_utc()); - - let stream = app - .channels() - .events(&channel, resume_at.as_ref()) - .await? - .and_then(|msg| future::ready(to_event(msg))); - - Ok(Sse::new(stream).keep_alive(sse::KeepAlive::default())) -} - -fn to_event(msg: BroadcastMessage) -> Result<sse::Event, BoxedError> { - let data = serde_json::to_string(&msg)?; - let event = sse::Event::default() - .id(msg - .sent_at - .to_rfc3339_opts(SecondsFormat::AutoSi, /* use_z */ true)) - .data(&data); - - Ok(event) -} @@ -6,7 +6,7 @@ use clap::Parser; use sqlx::sqlite::{SqliteConnectOptions, SqlitePool, SqlitePoolOptions}; use tokio::net; -use crate::{app::App, channel, clock, error::BoxedError, index, login}; +use crate::{app::App, channel, clock, error::BoxedError, events, index, login}; pub type Result<T> = std::result::Result<T, BoxedError>; @@ -64,7 +64,7 @@ impl Args { } fn routers() -> Router<App> { - [channel::router(), login::router()] + [channel::router(), events::router(), login::router()] .into_iter() .fold(index::routes::router(), Router::merge) } diff --git a/src/events.rs b/src/events.rs new file mode 100644 index 0000000..2d1e1f8 --- /dev/null +++ b/src/events.rs @@ -0,0 +1,105 @@ +use axum::{ + extract::State, + http::{HeaderName, HeaderValue}, + response::{ + sse::{self, Sse}, + IntoResponse, + }, + routing::get, + Router, +}; +use axum_extra::{extract::Query, typed_header::TypedHeader}; +use chrono::{format::SecondsFormat, DateTime}; +use futures::{ + future, + stream::{self, StreamExt as _, TryStreamExt as _}, +}; + +use crate::{ + app::App, + channel::repo::{channels::Id as ChannelId, messages::BroadcastMessage}, + error::BoxedError, + error::InternalError, + login::repo::logins::Login, +}; + +pub fn router() -> Router<App> { + Router::new().route("/events", get(on_events)) +} + +#[derive(serde::Deserialize)] +struct EventsQuery { + #[serde(default, rename = "channel")] + channels: Vec<ChannelId>, +} + +async fn on_events( + State(app): State<App>, + _: Login, // requires auth, but doesn't actually care who you are + last_event_id: Option<TypedHeader<LastEventId>>, + Query(query): Query<EventsQuery>, +) -> Result<impl IntoResponse, InternalError> { + let resume_at = last_event_id + .map(|TypedHeader(header)| header) + .map(|LastEventId(header)| header) + .map(|header| DateTime::parse_from_rfc3339(&header)) + .transpose()? + .map(|ts| ts.to_utc()); + + let streams = stream::iter(query.channels) + .then(|channel| { + let app = app.clone(); + async move { app.channels().events(&channel, resume_at.as_ref()).await } + }) + .try_collect::<Vec<_>>() + .await?; + + let stream = stream::select_all(streams).and_then(|msg| future::ready(to_event(msg))); + let sse = Sse::new(stream).keep_alive(sse::KeepAlive::default()); + + Ok(sse) +} + +fn to_event(msg: BroadcastMessage) -> Result<sse::Event, BoxedError> { + let data = serde_json::to_string(&msg)?; + let event = sse::Event::default() + .id(msg + .sent_at + .to_rfc3339_opts(SecondsFormat::AutoSi, /* use_z */ true)) + .data(&data); + + Ok(event) +} + +pub struct LastEventId(pub String); + +static LAST_EVENT_ID: HeaderName = HeaderName::from_static("last-event-id"); + +impl headers::Header for LastEventId { + fn name() -> &'static HeaderName { + &LAST_EVENT_ID + } + + fn decode<'i, I>(values: &mut I) -> Result<Self, headers::Error> + where + I: Iterator<Item = &'i HeaderValue>, + { + let value = values.next().ok_or_else(headers::Error::invalid)?; + if let Ok(value) = value.to_str() { + Ok(Self(value.into())) + } else { + Err(headers::Error::invalid()) + } + } + + fn encode<E>(&self, values: &mut E) + where + E: Extend<HeaderValue>, + { + let Self(value) = self; + // Must panic or suppress; the trait provides no other options. + let value = HeaderValue::from_str(value).expect("LastEventId is a valid header value"); + + values.extend(std::iter::once(value)); + } +} diff --git a/src/index/templates.rs b/src/index/templates.rs index 7472fd0..a69c19a 100644 --- a/src/index/templates.rs +++ b/src/index/templates.rs @@ -105,7 +105,8 @@ pub fn channel(channel: &Channel) -> Markup { span.sent_at { "(sent_at)" } ")" } } - link rel="events" href=(format!("/{}/events", channel.id)) {} + meta name="channel" content=(channel.id) {} + link rel="events" href=(format!("/events?channel={}", channel.id)) {} } body { section class="messages" {} @@ -3,6 +3,7 @@ mod channel; pub mod cli; mod clock; mod error; +mod events; mod id; mod index; mod login; |
