From 407ca8df6284ce1a4c649b018c7326fd195bbd26 Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Fri, 13 Sep 2024 22:30:02 -0400 Subject: Support Last-Event-Id as a method of resuming channel events after a disconnect --- ...99a6eb6396bbd068a4b52f7b2cb47c7ec3b3e55130.json | 44 ++++++++++++++++++++++ ...235d82b01b29f864f64daf1bcb19162b1eb26df597.json | 44 ---------------------- Cargo.lock | 36 +++++++++++++++++- Cargo.toml | 3 +- src/channel/app.rs | 30 ++++++++++++--- src/channel/header.rs | 34 +++++++++++++++++ src/channel/mod.rs | 1 + src/channel/repo/messages.rs | 3 ++ src/channel/routes.rs | 31 +++++++++++++-- 9 files changed, 170 insertions(+), 56 deletions(-) create mode 100644 .sqlx/query-64276398f24ab7f7d2783199a6eb6396bbd068a4b52f7b2cb47c7ec3b3e55130.json delete mode 100644 .sqlx/query-d4a496f6348550cd2c94c2235d82b01b29f864f64daf1bcb19162b1eb26df597.json create mode 100644 src/channel/header.rs diff --git a/.sqlx/query-64276398f24ab7f7d2783199a6eb6396bbd068a4b52f7b2cb47c7ec3b3e55130.json b/.sqlx/query-64276398f24ab7f7d2783199a6eb6396bbd068a4b52f7b2cb47c7ec3b3e55130.json new file mode 100644 index 0000000..b94bb4b --- /dev/null +++ b/.sqlx/query-64276398f24ab7f7d2783199a6eb6396bbd068a4b52f7b2cb47c7ec3b3e55130.json @@ -0,0 +1,44 @@ +{ + "db_name": "SQLite", + "query": "\n select\n message.id as \"id: Id\",\n login.id as \"sender_id: LoginId\",\n login.name as sender_name,\n message.body,\n message.sent_at as \"sent_at: DateTime\"\n from message\n join login on message.sender = login.id\n where channel = $1\n and coalesce(sent_at > $2, true)\n order by sent_at asc\n ", + "describe": { + "columns": [ + { + "name": "id: Id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "sender_id: LoginId", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "sender_name", + "ordinal": 2, + "type_info": "Text" + }, + { + "name": "body", + "ordinal": 3, + "type_info": "Text" + }, + { + "name": "sent_at: DateTime", + "ordinal": 4, + "type_info": "Text" + } + ], + "parameters": { + "Right": 2 + }, + "nullable": [ + false, + false, + false, + false, + false + ] + }, + "hash": "64276398f24ab7f7d2783199a6eb6396bbd068a4b52f7b2cb47c7ec3b3e55130" +} diff --git a/.sqlx/query-d4a496f6348550cd2c94c2235d82b01b29f864f64daf1bcb19162b1eb26df597.json b/.sqlx/query-d4a496f6348550cd2c94c2235d82b01b29f864f64daf1bcb19162b1eb26df597.json deleted file mode 100644 index e5c0eae..0000000 --- a/.sqlx/query-d4a496f6348550cd2c94c2235d82b01b29f864f64daf1bcb19162b1eb26df597.json +++ /dev/null @@ -1,44 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n select\n message.id as \"id: Id\",\n login.id as \"sender_id: LoginId\",\n login.name as sender_name,\n message.body,\n message.sent_at as \"sent_at: DateTime\"\n from message\n join login on message.sender = login.id\n where channel = $1\n order by sent_at asc\n ", - "describe": { - "columns": [ - { - "name": "id: Id", - "ordinal": 0, - "type_info": "Text" - }, - { - "name": "sender_id: LoginId", - "ordinal": 1, - "type_info": "Text" - }, - { - "name": "sender_name", - "ordinal": 2, - "type_info": "Text" - }, - { - "name": "body", - "ordinal": 3, - "type_info": "Text" - }, - { - "name": "sent_at: DateTime", - "ordinal": 4, - "type_info": "Text" - } - ], - "parameters": { - "Right": 1 - }, - "nullable": [ - false, - false, - false, - false, - false - ] - }, - "hash": "d4a496f6348550cd2c94c2235d82b01b29f864f64daf1bcb19162b1eb26df597" -} diff --git a/Cargo.lock b/Cargo.lock index ebcaee1..7268a4b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -203,6 +203,7 @@ dependencies = [ "bytes", "cookie", "futures-util", + "headers", "http", "http-body", "http-body-util", @@ -230,6 +231,12 @@ dependencies = [ "rustc-demangle", ] +[[package]] +name = "base64" +version = "0.21.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" + [[package]] name = "base64" version = "0.22.1" @@ -713,6 +720,30 @@ dependencies = [ "hashbrown", ] +[[package]] +name = "headers" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "322106e6bd0cba2d5ead589ddb8150a13d7c4217cf80d7c4f682ca994ccc6aa9" +dependencies = [ + "base64 0.21.7", + "bytes", + "headers-core", + "http", + "httpdate", + "mime", + "sha1", +] + +[[package]] +name = "headers-core" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "54b4a22553d4242c49fddb9ba998a99962b5cc6f22cb5a3482bec22522403ce4" +dependencies = [ + "http", +] + [[package]] name = "heck" version = "0.5.0" @@ -742,6 +773,7 @@ dependencies = [ "chrono", "clap", "futures", + "headers", "maud", "password-hash", "rand", @@ -1665,7 +1697,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "936cac0ab331b14cb3921c62156d913e4c15b74fb6ec0f3146bd4ef6e4fb3c12" dependencies = [ "atoi", - "base64", + "base64 0.22.1", "bitflags 2.6.0", "byteorder", "bytes", @@ -1708,7 +1740,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9734dbce698c67ecf67c442f768a5e90a49b2a4d61a9f1d59f73874bd4cf0710" dependencies = [ "atoi", - "base64", + "base64 0.22.1", "bitflags 2.6.0", "byteorder", "chrono", diff --git a/Cargo.toml b/Cargo.toml index fd74836..1f6893c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,10 +7,11 @@ edition = "2021" argon2 = "0.5.3" async-trait = "0.1.82" axum = { version = "0.7.5", features = ["form"] } -axum-extra = { version = "0.9.3", features = ["cookie"] } +axum-extra = { version = "0.9.3", features = ["cookie", "typed-header"] } chrono = { version = "0.4.38", features = ["serde"] } clap = { version = "4.5.16", features = ["derive", "env"] } futures = "0.3.30" +headers = "0.4.0" maud = { version = "0.26.0", features = ["axum"] } password-hash = { version = "0.5.0", features = ["std"] } rand = "0.8.5" diff --git a/src/channel/app.rs b/src/channel/app.rs index e242c2f..c0a6d60 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -2,8 +2,9 @@ use std::collections::{hash_map::Entry, HashMap}; use std::sync::{Arc, Mutex, MutexGuard}; use futures::{ + future, stream::{self, StreamExt as _, TryStreamExt as _}, - Stream, + TryStream, }; use sqlx::sqlite::SqlitePool; use tokio::sync::broadcast::{channel, Sender}; @@ -55,14 +56,33 @@ impl<'a> Channels<'a> { pub async fn events( &self, channel: &ChannelId, - ) -> Result>, BoxedError> { - let live_messages = self.broadcaster.listen(channel).map_err(BoxedError::from); + resume_at: Option<&DateTime>, + ) -> Result, BoxedError> { + fn skip_stale( + resume_at: Option<&DateTime>, + ) -> impl for<'m> FnMut(&'m BroadcastMessage) -> future::Ready> { + let resume_at = resume_at.cloned(); + move |msg| { + future::ready(Ok(match resume_at { + None => false, + Some(resume_at) => msg.sent_at <= resume_at, + })) + } + } + + let live_messages = self + .broadcaster + .listen(channel) + .map_err(BoxedError::from) + .try_skip_while(skip_stale(resume_at)); let mut tx = self.db.begin().await?; - let stored_messages = tx.messages().for_replay(channel).await?; + let stored_messages = tx.messages().for_replay(channel, resume_at).await?; tx.commit().await?; - Ok(stream::iter(stored_messages).map(Ok).chain(live_messages)) + let stored_messages = stream::iter(stored_messages).map(Ok); + + Ok(stored_messages.chain(live_messages)) } } diff --git a/src/channel/header.rs b/src/channel/header.rs new file mode 100644 index 0000000..eda8214 --- /dev/null +++ b/src/channel/header.rs @@ -0,0 +1,34 @@ +use axum::http::{HeaderName, HeaderValue}; + +pub struct LastEventId(pub String); + +static LAST_EVENT_ID: HeaderName = HeaderName::from_static("last-event-id"); + +impl headers::Header for LastEventId { + fn name() -> &'static HeaderName { + &LAST_EVENT_ID + } + + fn decode<'i, I>(values: &mut I) -> Result + where + I: Iterator, + { + let value = values.next().ok_or_else(headers::Error::invalid)?; + if let Ok(value) = value.to_str() { + Ok(Self(value.into())) + } else { + Err(headers::Error::invalid()) + } + } + + fn encode(&self, values: &mut E) + where + E: Extend, + { + let Self(value) = self; + // Must panic or suppress; the trait provides no other options. + let value = HeaderValue::from_str(value).expect("LastEventId is a valid header value"); + + values.extend(std::iter::once(value)); + } +} diff --git a/src/channel/mod.rs b/src/channel/mod.rs index f67ea04..bc2cc6c 100644 --- a/src/channel/mod.rs +++ b/src/channel/mod.rs @@ -1,4 +1,5 @@ pub mod app; +mod header; pub mod repo; mod routes; diff --git a/src/channel/repo/messages.rs b/src/channel/repo/messages.rs index fe833b6..b465f61 100644 --- a/src/channel/repo/messages.rs +++ b/src/channel/repo/messages.rs @@ -73,6 +73,7 @@ impl<'c> Messages<'c> { pub async fn for_replay( &mut self, channel: &ChannelId, + resume_at: Option<&DateTime>, ) -> Result, BoxedError> { let messages = sqlx::query!( r#" @@ -85,9 +86,11 @@ impl<'c> Messages<'c> { from message join login on message.sender = login.id where channel = $1 + and coalesce(sent_at > $2, true) order by sent_at asc "#, channel, + resume_at, ) .map(|row| BroadcastMessage { id: row.id, diff --git a/src/channel/routes.rs b/src/channel/routes.rs index 0f95c69..4f83a8b 100644 --- a/src/channel/routes.rs +++ b/src/channel/routes.rs @@ -8,9 +8,14 @@ use axum::{ routing::{get, post}, Router, }; +use axum_extra::TypedHeader; +use chrono::{format::SecondsFormat, DateTime}; use futures::{future, stream::TryStreamExt as _}; -use super::repo::channels::Id as ChannelId; +use super::{ + header::LastEventId, + repo::{channels::Id as ChannelId, messages::BroadcastMessage}, +}; use crate::{ app::App, clock::RequestedAt, error::BoxedError, error::InternalError, login::repo::logins::Login, @@ -61,13 +66,31 @@ async fn on_events( Path(channel): Path, State(app): State, _: Login, // requires auth, but doesn't actually care who you are + last_event_id: Option>, ) -> Result { + let resume_at = last_event_id + .map(|TypedHeader(header)| header) + .map(|LastEventId(header)| header) + .map(|header| DateTime::parse_from_rfc3339(&header)) + .transpose()? + .map(|ts| ts.to_utc()); + let stream = app .channels() - .events(&channel) + .events(&channel, resume_at.as_ref()) .await? - .and_then(|msg| future::ready(serde_json::to_string(&msg).map_err(BoxedError::from))) - .map_ok(|msg| sse::Event::default().data(&msg)); + .and_then(|msg| future::ready(to_event(msg))); Ok(Sse::new(stream).keep_alive(sse::KeepAlive::default())) } + +fn to_event(msg: BroadcastMessage) -> Result { + let data = serde_json::to_string(&msg)?; + let event = sse::Event::default() + .id(msg + .sent_at + .to_rfc3339_opts(SecondsFormat::AutoSi, /* use_z */ true)) + .data(&data); + + Ok(event) +} -- cgit v1.2.3