diff options
| author | Owen Jacobson <owen@grimoire.ca> | 2024-09-13 22:30:02 -0400 |
|---|---|---|
| committer | Owen Jacobson <owen@grimoire.ca> | 2024-09-13 23:12:31 -0400 |
| commit | 407ca8df6284ce1a4c649b018c7326fd195bbd26 (patch) | |
| tree | 876091c17efbd765a4c7ef339548c0ff4dfb96d5 | |
| parent | 388a3d5a925aef7ff39339454ae0d720e05f038e (diff) | |
Support Last-Event-Id as a method of resuming channel events after a disconnect
| -rw-r--r-- | .sqlx/query-64276398f24ab7f7d2783199a6eb6396bbd068a4b52f7b2cb47c7ec3b3e55130.json (renamed from .sqlx/query-d4a496f6348550cd2c94c2235d82b01b29f864f64daf1bcb19162b1eb26df597.json) | 6 | ||||
| -rw-r--r-- | Cargo.lock | 36 | ||||
| -rw-r--r-- | Cargo.toml | 3 | ||||
| -rw-r--r-- | src/channel/app.rs | 30 | ||||
| -rw-r--r-- | src/channel/header.rs | 34 | ||||
| -rw-r--r-- | src/channel/mod.rs | 1 | ||||
| -rw-r--r-- | src/channel/repo/messages.rs | 3 | ||||
| -rw-r--r-- | src/channel/routes.rs | 31 |
8 files changed, 129 insertions, 15 deletions
diff --git a/.sqlx/query-d4a496f6348550cd2c94c2235d82b01b29f864f64daf1bcb19162b1eb26df597.json b/.sqlx/query-64276398f24ab7f7d2783199a6eb6396bbd068a4b52f7b2cb47c7ec3b3e55130.json index e5c0eae..b94bb4b 100644 --- a/.sqlx/query-d4a496f6348550cd2c94c2235d82b01b29f864f64daf1bcb19162b1eb26df597.json +++ b/.sqlx/query-64276398f24ab7f7d2783199a6eb6396bbd068a4b52f7b2cb47c7ec3b3e55130.json @@ -1,6 +1,6 @@ { "db_name": "SQLite", - "query": "\n select\n message.id as \"id: Id\",\n login.id as \"sender_id: LoginId\",\n login.name as sender_name,\n message.body,\n message.sent_at as \"sent_at: DateTime\"\n from message\n join login on message.sender = login.id\n where channel = $1\n order by sent_at asc\n ", + "query": "\n select\n message.id as \"id: Id\",\n login.id as \"sender_id: LoginId\",\n login.name as sender_name,\n message.body,\n message.sent_at as \"sent_at: DateTime\"\n from message\n join login on message.sender = login.id\n where channel = $1\n and coalesce(sent_at > $2, true)\n order by sent_at asc\n ", "describe": { "columns": [ { @@ -30,7 +30,7 @@ } ], "parameters": { - "Right": 1 + "Right": 2 }, "nullable": [ false, @@ -40,5 +40,5 @@ false ] }, - "hash": "d4a496f6348550cd2c94c2235d82b01b29f864f64daf1bcb19162b1eb26df597" + "hash": "64276398f24ab7f7d2783199a6eb6396bbd068a4b52f7b2cb47c7ec3b3e55130" } @@ -203,6 +203,7 @@ dependencies = [ "bytes", "cookie", "futures-util", + "headers", "http", "http-body", "http-body-util", @@ -232,6 +233,12 @@ dependencies = [ [[package]] name = "base64" +version = "0.21.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" + +[[package]] +name = "base64" version = "0.22.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" @@ -714,6 +721,30 @@ dependencies = [ ] [[package]] +name = "headers" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "322106e6bd0cba2d5ead589ddb8150a13d7c4217cf80d7c4f682ca994ccc6aa9" +dependencies = [ + "base64 0.21.7", + "bytes", + "headers-core", + "http", + "httpdate", + "mime", + "sha1", +] + +[[package]] +name = "headers-core" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "54b4a22553d4242c49fddb9ba998a99962b5cc6f22cb5a3482bec22522403ce4" +dependencies = [ + "http", +] + +[[package]] name = "heck" version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -742,6 +773,7 @@ dependencies = [ "chrono", "clap", "futures", + "headers", "maud", "password-hash", "rand", @@ -1665,7 +1697,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "936cac0ab331b14cb3921c62156d913e4c15b74fb6ec0f3146bd4ef6e4fb3c12" dependencies = [ "atoi", - "base64", + "base64 0.22.1", "bitflags 2.6.0", "byteorder", "bytes", @@ -1708,7 +1740,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9734dbce698c67ecf67c442f768a5e90a49b2a4d61a9f1d59f73874bd4cf0710" dependencies = [ "atoi", - "base64", + "base64 0.22.1", "bitflags 2.6.0", "byteorder", "chrono", @@ -7,10 +7,11 @@ edition = "2021" argon2 = "0.5.3" async-trait = "0.1.82" axum = { version = "0.7.5", features = ["form"] } -axum-extra = { version = "0.9.3", features = ["cookie"] } +axum-extra = { version = "0.9.3", features = ["cookie", "typed-header"] } chrono = { version = "0.4.38", features = ["serde"] } clap = { version = "4.5.16", features = ["derive", "env"] } futures = "0.3.30" +headers = "0.4.0" maud = { version = "0.26.0", features = ["axum"] } password-hash = { version = "0.5.0", features = ["std"] } rand = "0.8.5" diff --git a/src/channel/app.rs b/src/channel/app.rs index e242c2f..c0a6d60 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -2,8 +2,9 @@ use std::collections::{hash_map::Entry, HashMap}; use std::sync::{Arc, Mutex, MutexGuard}; use futures::{ + future, stream::{self, StreamExt as _, TryStreamExt as _}, - Stream, + TryStream, }; use sqlx::sqlite::SqlitePool; use tokio::sync::broadcast::{channel, Sender}; @@ -55,14 +56,33 @@ impl<'a> Channels<'a> { pub async fn events( &self, channel: &ChannelId, - ) -> Result<impl Stream<Item = Result<BroadcastMessage, BoxedError>>, BoxedError> { - let live_messages = self.broadcaster.listen(channel).map_err(BoxedError::from); + resume_at: Option<&DateTime>, + ) -> Result<impl TryStream<Ok = BroadcastMessage, Error = BoxedError>, BoxedError> { + fn skip_stale<E>( + resume_at: Option<&DateTime>, + ) -> impl for<'m> FnMut(&'m BroadcastMessage) -> future::Ready<Result<bool, E>> { + let resume_at = resume_at.cloned(); + move |msg| { + future::ready(Ok(match resume_at { + None => false, + Some(resume_at) => msg.sent_at <= resume_at, + })) + } + } + + let live_messages = self + .broadcaster + .listen(channel) + .map_err(BoxedError::from) + .try_skip_while(skip_stale(resume_at)); let mut tx = self.db.begin().await?; - let stored_messages = tx.messages().for_replay(channel).await?; + let stored_messages = tx.messages().for_replay(channel, resume_at).await?; tx.commit().await?; - Ok(stream::iter(stored_messages).map(Ok).chain(live_messages)) + let stored_messages = stream::iter(stored_messages).map(Ok); + + Ok(stored_messages.chain(live_messages)) } } diff --git a/src/channel/header.rs b/src/channel/header.rs new file mode 100644 index 0000000..eda8214 --- /dev/null +++ b/src/channel/header.rs @@ -0,0 +1,34 @@ +use axum::http::{HeaderName, HeaderValue}; + +pub struct LastEventId(pub String); + +static LAST_EVENT_ID: HeaderName = HeaderName::from_static("last-event-id"); + +impl headers::Header for LastEventId { + fn name() -> &'static HeaderName { + &LAST_EVENT_ID + } + + fn decode<'i, I>(values: &mut I) -> Result<Self, headers::Error> + where + I: Iterator<Item = &'i HeaderValue>, + { + let value = values.next().ok_or_else(headers::Error::invalid)?; + if let Ok(value) = value.to_str() { + Ok(Self(value.into())) + } else { + Err(headers::Error::invalid()) + } + } + + fn encode<E>(&self, values: &mut E) + where + E: Extend<HeaderValue>, + { + let Self(value) = self; + // Must panic or suppress; the trait provides no other options. + let value = HeaderValue::from_str(value).expect("LastEventId is a valid header value"); + + values.extend(std::iter::once(value)); + } +} diff --git a/src/channel/mod.rs b/src/channel/mod.rs index f67ea04..bc2cc6c 100644 --- a/src/channel/mod.rs +++ b/src/channel/mod.rs @@ -1,4 +1,5 @@ pub mod app; +mod header; pub mod repo; mod routes; diff --git a/src/channel/repo/messages.rs b/src/channel/repo/messages.rs index fe833b6..b465f61 100644 --- a/src/channel/repo/messages.rs +++ b/src/channel/repo/messages.rs @@ -73,6 +73,7 @@ impl<'c> Messages<'c> { pub async fn for_replay( &mut self, channel: &ChannelId, + resume_at: Option<&DateTime>, ) -> Result<Vec<BroadcastMessage>, BoxedError> { let messages = sqlx::query!( r#" @@ -85,9 +86,11 @@ impl<'c> Messages<'c> { from message join login on message.sender = login.id where channel = $1 + and coalesce(sent_at > $2, true) order by sent_at asc "#, channel, + resume_at, ) .map(|row| BroadcastMessage { id: row.id, diff --git a/src/channel/routes.rs b/src/channel/routes.rs index 0f95c69..4f83a8b 100644 --- a/src/channel/routes.rs +++ b/src/channel/routes.rs @@ -8,9 +8,14 @@ use axum::{ routing::{get, post}, Router, }; +use axum_extra::TypedHeader; +use chrono::{format::SecondsFormat, DateTime}; use futures::{future, stream::TryStreamExt as _}; -use super::repo::channels::Id as ChannelId; +use super::{ + header::LastEventId, + repo::{channels::Id as ChannelId, messages::BroadcastMessage}, +}; use crate::{ app::App, clock::RequestedAt, error::BoxedError, error::InternalError, login::repo::logins::Login, @@ -61,13 +66,31 @@ async fn on_events( Path(channel): Path<ChannelId>, State(app): State<App>, _: Login, // requires auth, but doesn't actually care who you are + last_event_id: Option<TypedHeader<LastEventId>>, ) -> Result<impl IntoResponse, InternalError> { + let resume_at = last_event_id + .map(|TypedHeader(header)| header) + .map(|LastEventId(header)| header) + .map(|header| DateTime::parse_from_rfc3339(&header)) + .transpose()? + .map(|ts| ts.to_utc()); + let stream = app .channels() - .events(&channel) + .events(&channel, resume_at.as_ref()) .await? - .and_then(|msg| future::ready(serde_json::to_string(&msg).map_err(BoxedError::from))) - .map_ok(|msg| sse::Event::default().data(&msg)); + .and_then(|msg| future::ready(to_event(msg))); Ok(Sse::new(stream).keep_alive(sse::KeepAlive::default())) } + +fn to_event(msg: BroadcastMessage) -> Result<sse::Event, BoxedError> { + let data = serde_json::to_string(&msg)?; + let event = sse::Event::default() + .id(msg + .sent_at + .to_rfc3339_opts(SecondsFormat::AutoSi, /* use_z */ true)) + .data(&data); + + Ok(event) +} |
