summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorOwen Jacobson <owen@grimoire.ca>2024-09-13 22:30:02 -0400
committerOwen Jacobson <owen@grimoire.ca>2024-09-13 23:12:31 -0400
commit407ca8df6284ce1a4c649b018c7326fd195bbd26 (patch)
tree876091c17efbd765a4c7ef339548c0ff4dfb96d5
parent388a3d5a925aef7ff39339454ae0d720e05f038e (diff)
Support Last-Event-Id as a method of resuming channel events after a disconnect
-rw-r--r--.sqlx/query-64276398f24ab7f7d2783199a6eb6396bbd068a4b52f7b2cb47c7ec3b3e55130.json (renamed from .sqlx/query-d4a496f6348550cd2c94c2235d82b01b29f864f64daf1bcb19162b1eb26df597.json)6
-rw-r--r--Cargo.lock36
-rw-r--r--Cargo.toml3
-rw-r--r--src/channel/app.rs30
-rw-r--r--src/channel/header.rs34
-rw-r--r--src/channel/mod.rs1
-rw-r--r--src/channel/repo/messages.rs3
-rw-r--r--src/channel/routes.rs31
8 files changed, 129 insertions, 15 deletions
diff --git a/.sqlx/query-d4a496f6348550cd2c94c2235d82b01b29f864f64daf1bcb19162b1eb26df597.json b/.sqlx/query-64276398f24ab7f7d2783199a6eb6396bbd068a4b52f7b2cb47c7ec3b3e55130.json
index e5c0eae..b94bb4b 100644
--- a/.sqlx/query-d4a496f6348550cd2c94c2235d82b01b29f864f64daf1bcb19162b1eb26df597.json
+++ b/.sqlx/query-64276398f24ab7f7d2783199a6eb6396bbd068a4b52f7b2cb47c7ec3b3e55130.json
@@ -1,6 +1,6 @@
{
"db_name": "SQLite",
- "query": "\n select\n message.id as \"id: Id\",\n login.id as \"sender_id: LoginId\",\n login.name as sender_name,\n message.body,\n message.sent_at as \"sent_at: DateTime\"\n from message\n join login on message.sender = login.id\n where channel = $1\n order by sent_at asc\n ",
+ "query": "\n select\n message.id as \"id: Id\",\n login.id as \"sender_id: LoginId\",\n login.name as sender_name,\n message.body,\n message.sent_at as \"sent_at: DateTime\"\n from message\n join login on message.sender = login.id\n where channel = $1\n and coalesce(sent_at > $2, true)\n order by sent_at asc\n ",
"describe": {
"columns": [
{
@@ -30,7 +30,7 @@
}
],
"parameters": {
- "Right": 1
+ "Right": 2
},
"nullable": [
false,
@@ -40,5 +40,5 @@
false
]
},
- "hash": "d4a496f6348550cd2c94c2235d82b01b29f864f64daf1bcb19162b1eb26df597"
+ "hash": "64276398f24ab7f7d2783199a6eb6396bbd068a4b52f7b2cb47c7ec3b3e55130"
}
diff --git a/Cargo.lock b/Cargo.lock
index ebcaee1..7268a4b 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -203,6 +203,7 @@ dependencies = [
"bytes",
"cookie",
"futures-util",
+ "headers",
"http",
"http-body",
"http-body-util",
@@ -232,6 +233,12 @@ dependencies = [
[[package]]
name = "base64"
+version = "0.21.7"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567"
+
+[[package]]
+name = "base64"
version = "0.22.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6"
@@ -714,6 +721,30 @@ dependencies = [
]
[[package]]
+name = "headers"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "322106e6bd0cba2d5ead589ddb8150a13d7c4217cf80d7c4f682ca994ccc6aa9"
+dependencies = [
+ "base64 0.21.7",
+ "bytes",
+ "headers-core",
+ "http",
+ "httpdate",
+ "mime",
+ "sha1",
+]
+
+[[package]]
+name = "headers-core"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "54b4a22553d4242c49fddb9ba998a99962b5cc6f22cb5a3482bec22522403ce4"
+dependencies = [
+ "http",
+]
+
+[[package]]
name = "heck"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -742,6 +773,7 @@ dependencies = [
"chrono",
"clap",
"futures",
+ "headers",
"maud",
"password-hash",
"rand",
@@ -1665,7 +1697,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "936cac0ab331b14cb3921c62156d913e4c15b74fb6ec0f3146bd4ef6e4fb3c12"
dependencies = [
"atoi",
- "base64",
+ "base64 0.22.1",
"bitflags 2.6.0",
"byteorder",
"bytes",
@@ -1708,7 +1740,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9734dbce698c67ecf67c442f768a5e90a49b2a4d61a9f1d59f73874bd4cf0710"
dependencies = [
"atoi",
- "base64",
+ "base64 0.22.1",
"bitflags 2.6.0",
"byteorder",
"chrono",
diff --git a/Cargo.toml b/Cargo.toml
index fd74836..1f6893c 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -7,10 +7,11 @@ edition = "2021"
argon2 = "0.5.3"
async-trait = "0.1.82"
axum = { version = "0.7.5", features = ["form"] }
-axum-extra = { version = "0.9.3", features = ["cookie"] }
+axum-extra = { version = "0.9.3", features = ["cookie", "typed-header"] }
chrono = { version = "0.4.38", features = ["serde"] }
clap = { version = "4.5.16", features = ["derive", "env"] }
futures = "0.3.30"
+headers = "0.4.0"
maud = { version = "0.26.0", features = ["axum"] }
password-hash = { version = "0.5.0", features = ["std"] }
rand = "0.8.5"
diff --git a/src/channel/app.rs b/src/channel/app.rs
index e242c2f..c0a6d60 100644
--- a/src/channel/app.rs
+++ b/src/channel/app.rs
@@ -2,8 +2,9 @@ use std::collections::{hash_map::Entry, HashMap};
use std::sync::{Arc, Mutex, MutexGuard};
use futures::{
+ future,
stream::{self, StreamExt as _, TryStreamExt as _},
- Stream,
+ TryStream,
};
use sqlx::sqlite::SqlitePool;
use tokio::sync::broadcast::{channel, Sender};
@@ -55,14 +56,33 @@ impl<'a> Channels<'a> {
pub async fn events(
&self,
channel: &ChannelId,
- ) -> Result<impl Stream<Item = Result<BroadcastMessage, BoxedError>>, BoxedError> {
- let live_messages = self.broadcaster.listen(channel).map_err(BoxedError::from);
+ resume_at: Option<&DateTime>,
+ ) -> Result<impl TryStream<Ok = BroadcastMessage, Error = BoxedError>, BoxedError> {
+ fn skip_stale<E>(
+ resume_at: Option<&DateTime>,
+ ) -> impl for<'m> FnMut(&'m BroadcastMessage) -> future::Ready<Result<bool, E>> {
+ let resume_at = resume_at.cloned();
+ move |msg| {
+ future::ready(Ok(match resume_at {
+ None => false,
+ Some(resume_at) => msg.sent_at <= resume_at,
+ }))
+ }
+ }
+
+ let live_messages = self
+ .broadcaster
+ .listen(channel)
+ .map_err(BoxedError::from)
+ .try_skip_while(skip_stale(resume_at));
let mut tx = self.db.begin().await?;
- let stored_messages = tx.messages().for_replay(channel).await?;
+ let stored_messages = tx.messages().for_replay(channel, resume_at).await?;
tx.commit().await?;
- Ok(stream::iter(stored_messages).map(Ok).chain(live_messages))
+ let stored_messages = stream::iter(stored_messages).map(Ok);
+
+ Ok(stored_messages.chain(live_messages))
}
}
diff --git a/src/channel/header.rs b/src/channel/header.rs
new file mode 100644
index 0000000..eda8214
--- /dev/null
+++ b/src/channel/header.rs
@@ -0,0 +1,34 @@
+use axum::http::{HeaderName, HeaderValue};
+
+pub struct LastEventId(pub String);
+
+static LAST_EVENT_ID: HeaderName = HeaderName::from_static("last-event-id");
+
+impl headers::Header for LastEventId {
+ fn name() -> &'static HeaderName {
+ &LAST_EVENT_ID
+ }
+
+ fn decode<'i, I>(values: &mut I) -> Result<Self, headers::Error>
+ where
+ I: Iterator<Item = &'i HeaderValue>,
+ {
+ let value = values.next().ok_or_else(headers::Error::invalid)?;
+ if let Ok(value) = value.to_str() {
+ Ok(Self(value.into()))
+ } else {
+ Err(headers::Error::invalid())
+ }
+ }
+
+ fn encode<E>(&self, values: &mut E)
+ where
+ E: Extend<HeaderValue>,
+ {
+ let Self(value) = self;
+ // Must panic or suppress; the trait provides no other options.
+ let value = HeaderValue::from_str(value).expect("LastEventId is a valid header value");
+
+ values.extend(std::iter::once(value));
+ }
+}
diff --git a/src/channel/mod.rs b/src/channel/mod.rs
index f67ea04..bc2cc6c 100644
--- a/src/channel/mod.rs
+++ b/src/channel/mod.rs
@@ -1,4 +1,5 @@
pub mod app;
+mod header;
pub mod repo;
mod routes;
diff --git a/src/channel/repo/messages.rs b/src/channel/repo/messages.rs
index fe833b6..b465f61 100644
--- a/src/channel/repo/messages.rs
+++ b/src/channel/repo/messages.rs
@@ -73,6 +73,7 @@ impl<'c> Messages<'c> {
pub async fn for_replay(
&mut self,
channel: &ChannelId,
+ resume_at: Option<&DateTime>,
) -> Result<Vec<BroadcastMessage>, BoxedError> {
let messages = sqlx::query!(
r#"
@@ -85,9 +86,11 @@ impl<'c> Messages<'c> {
from message
join login on message.sender = login.id
where channel = $1
+ and coalesce(sent_at > $2, true)
order by sent_at asc
"#,
channel,
+ resume_at,
)
.map(|row| BroadcastMessage {
id: row.id,
diff --git a/src/channel/routes.rs b/src/channel/routes.rs
index 0f95c69..4f83a8b 100644
--- a/src/channel/routes.rs
+++ b/src/channel/routes.rs
@@ -8,9 +8,14 @@ use axum::{
routing::{get, post},
Router,
};
+use axum_extra::TypedHeader;
+use chrono::{format::SecondsFormat, DateTime};
use futures::{future, stream::TryStreamExt as _};
-use super::repo::channels::Id as ChannelId;
+use super::{
+ header::LastEventId,
+ repo::{channels::Id as ChannelId, messages::BroadcastMessage},
+};
use crate::{
app::App, clock::RequestedAt, error::BoxedError, error::InternalError,
login::repo::logins::Login,
@@ -61,13 +66,31 @@ async fn on_events(
Path(channel): Path<ChannelId>,
State(app): State<App>,
_: Login, // requires auth, but doesn't actually care who you are
+ last_event_id: Option<TypedHeader<LastEventId>>,
) -> Result<impl IntoResponse, InternalError> {
+ let resume_at = last_event_id
+ .map(|TypedHeader(header)| header)
+ .map(|LastEventId(header)| header)
+ .map(|header| DateTime::parse_from_rfc3339(&header))
+ .transpose()?
+ .map(|ts| ts.to_utc());
+
let stream = app
.channels()
- .events(&channel)
+ .events(&channel, resume_at.as_ref())
.await?
- .and_then(|msg| future::ready(serde_json::to_string(&msg).map_err(BoxedError::from)))
- .map_ok(|msg| sse::Event::default().data(&msg));
+ .and_then(|msg| future::ready(to_event(msg)));
Ok(Sse::new(stream).keep_alive(sse::KeepAlive::default()))
}
+
+fn to_event(msg: BroadcastMessage) -> Result<sse::Event, BoxedError> {
+ let data = serde_json::to_string(&msg)?;
+ let event = sse::Event::default()
+ .id(msg
+ .sent_at
+ .to_rfc3339_opts(SecondsFormat::AutoSi, /* use_z */ true))
+ .data(&data);
+
+ Ok(event)
+}