From e2cdb46c3f6707c1b01f8827d8ba491469b5679f Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Tue, 8 Apr 2025 19:40:32 -0400 Subject: Heartbeats are part of the event protocol. A heartbeat is an event that the server synthesizes any time an event stream has been idle for longer than some timeout. They allow clients to detect disconnection and network problems, which would otherwise go unnoticed because event streams are a one-way channel. Most network problems only become clear when the offended party tries to _send_ something, and subscribing to an event stream only sends something during the request phase. Technically, Pilcrow has always sent these, since we started using Axum's SSE support: it defaults to sending a dummy event after 15 seconds (consisting of `":\n\n"`, which is then ignored). I've built Pilcrow's heartbeat support out of that, by customizing the event sent back. The results _mostly_ look like existing events, but there are two key differences: * Heartbeats don't have `id` fields in the event stream. They're synthetic, and they don't participate in either the "resume at" sequence management, or the last-event-id header-based resumption management. * Heartbeats have an `event` but no `type` field in the message body. There are no subtypes. To make it less likely that clients will race with the server on expiring timeouts, heartbeats are sent about five seconds early. In this change, heartbeats are due after 20 seconds, but are sent after 15. If it takes longer than five seconds for a heartbeat to arrive, a client can and should treat that as a network problem and reconnect, but I'd really like to avoid that happening over differences smaller than a second, so I've left a margin. I originally sketched this out in conversation with @wlonk as having each event carry a deadline for the next one. I ultimately opted not to do that for a few reasons. First, Axum makes it hard - the built-in keep-alive support only works with a static event, and cannot make dynamic ones whose payloads might vary (for example if the deadline is variable). Second, it's complex, to no apparent gain, and adds deadline information to _every_ event type. This implementation, instead, sends deadline information as part of boot, as a fixed interval in seconds. Clients are responsible for working out deadlines based on message arrivals. This is fine; heartbeat-based connection management is best effort at the best of times, so a few milliseconds of slop in either direction won't hurt anything. The existing client ignores these events entirely, which is convenient. The new heartbeat event type is defined alongside the main event type, to make it less likely that we'll inadvertently make changes to one but not the other. We can still do so advertently, I just don't want it to be an accident. --- docs/api/boot.md | 2 ++ docs/api/events.md | 25 ++++++++++++++++++++----- src/boot/app.rs | 4 +++- src/boot/mod.rs | 15 +++++++++++++-- src/event/mod.rs | 45 +++++++++++++++++++++++++++++++++++++++++++++ src/event/routes/get.rs | 10 ++++++---- 6 files changed, 89 insertions(+), 12 deletions(-) diff --git a/docs/api/boot.md b/docs/api/boot.md index 0c2dc08..46b972f 100644 --- a/docs/api/boot.md +++ b/docs/api/boot.md @@ -42,6 +42,7 @@ This endpoint will respond with a status of "id": "U1234abcd" }, "resume_point": 1312, + "heartbeat": 30, "users": [ { "id": "U1234abcd", @@ -72,6 +73,7 @@ The response will include the following fields: |:---------------|:----------------|:-------------------------------------------------------------------------------------------------------------------------| | `user` | object | The details of the caller's identity. | | `resume_point` | integer | A resume point for [events](./events.md), such that the event stream will begin immediately after the included snapshot. | +| `heartbeat` | integer | The [heartbeat timeout](./events.md#heartbeat-events), in seconds, for events. | | `users` | array of object | A snapshot of the users present in the service. | | `channels` | array of object | A snapshot of the channels present in the service. | | `messages` | array of object | A snapshot of the messages present in the service. | diff --git a/docs/api/events.md b/docs/api/events.md index 3347a26..7fc7d78 100644 --- a/docs/api/events.md +++ b/docs/api/events.md @@ -86,12 +86,27 @@ The service may terminate the connection at any time. Clients should reconnect a Each event's `data` consists of a JSON object describing one event. Every event includes the following fields: -| Field | Type | Description | -|:--------|:-------|:-------------------------------------------------------------------------------------------------------------| -| `type` | string | The type of entity the event describes. Will be one of the types listed in the next section. | -| `event` | string | The specific kind of event. Will be one of the events listed with the associated `type` in the next section. | +| Field | Type | Description | +|:--------|:-----------------|:-------------------------------------------------------------------------------------------------------------| +| `type` | string | The type of entity the event describes. Will be one of the types listed in the next section. | +| `event` | string, optional | The specific kind of event. Will be one of the events listed with the associated `type` in the next section. | -The remaining fields depend on the `type` and `event` field. +The remaining fields depend on the `type` and (if present) the `event` field. + + +## Heartbeat events + +```json +{ + "type": "heartbeat" +} +``` + +To help clients detect network interruptions, the service guarantees that it will deliver an event after a fixed interval called the "heartbeat interval." The specific interval length is given in seconds as part of the [boot response](./boot.md). If the service determines that the heartbeat interval is close to expiring, it will synthesize and deliver a heartbeat event. + +Clients should treat any period of time without events, longer than the heartbeat interval, as an indication that the event stream may have been interrupted. Clients may also use other techniques, such as [browser APIs](https://developer.mozilla.org/en-US/docs/Web/API/EventSource/error_event), to detect this condition and restart the connection. + +These events have the `type` field set to `"heartbeat"`. The `event` field is absent. ## User events diff --git a/src/boot/app.rs b/src/boot/app.rs index f531afe..cd45c38 100644 --- a/src/boot/app.rs +++ b/src/boot/app.rs @@ -3,7 +3,7 @@ use sqlx::sqlite::SqlitePool; use super::Snapshot; use crate::{ channel::{self, repo::Provider as _}, - event::repo::Provider as _, + event::{Heartbeat, repo::Provider as _}, message::repo::Provider as _, name, user::{self, repo::Provider as _}, @@ -21,6 +21,7 @@ impl<'a> Boot<'a> { pub async fn snapshot(&self) -> Result { let mut tx = self.db.begin().await?; let resume_point = tx.sequence().current().await?; + let heartbeat = Heartbeat::TIMEOUT; let users = tx.users().all(resume_point).await?; let channels = tx.channels().all(resume_point).await?; @@ -45,6 +46,7 @@ impl<'a> Boot<'a> { Ok(Snapshot { resume_point, + heartbeat, users, channels, messages, diff --git a/src/boot/mod.rs b/src/boot/mod.rs index c52b088..122bd53 100644 --- a/src/boot/mod.rs +++ b/src/boot/mod.rs @@ -1,14 +1,25 @@ +use crate::{channel::Channel, event::Sequence, message::Message, user::User}; +use serde::Serialize; +use std::time::Duration; + pub mod app; mod routes; -use crate::{channel::Channel, event::Sequence, message::Message, user::User}; - pub use self::routes::router; #[derive(serde::Serialize)] pub struct Snapshot { pub resume_point: Sequence, + #[serde(serialize_with = "as_seconds")] + pub heartbeat: Duration, pub users: Vec, pub channels: Vec, pub messages: Vec, } + +fn as_seconds(duration: &Duration, serializer: S) -> Result +where + S: serde::Serializer, +{ + duration.as_secs().serialize(serializer) +} diff --git a/src/event/mod.rs b/src/event/mod.rs index 3ab88ec..1f2ec42 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -1,4 +1,7 @@ use crate::{channel, message, user}; +use axum::response::sse; +use axum::response::sse::KeepAlive; +use std::time::Duration; pub mod app; mod broadcaster; @@ -21,6 +24,16 @@ pub enum Event { Message(message::Event), } +// Serialized representation is intended to look like the serialized representation of `Event`, +// above - though heartbeat events contain only a type field and none of the other event gubbins. +// They don't have to participate in sequence numbering, aren't generated from stored data, and +// generally Are Weird. +#[derive(serde::Serialize)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum Heartbeat { + Heartbeat, +} + impl Sequenced for Event { fn instant(&self) -> Instant { match self { @@ -48,3 +61,35 @@ impl From for Event { Self::Message(event) } } + +impl Heartbeat { + // The following values are a first-rough-guess attempt to balance noticing connection problems + // quickly with managing the (modest) costs of delivering and processing heartbeats. Feel + // encouraged to tune them if you have a better idea on how to set them! + + // Advise clients to expect heartbeats this often + pub const TIMEOUT: Duration = Duration::from_secs(20); + // Actually send heartbeats this often; this is shorter to allow time for the heartbeat to + // arrive before the advised deadline. + pub const INTERVAL: Duration = Duration::from_secs(15); +} + +impl TryFrom for sse::Event { + type Error = serde_json::Error; + + fn try_from(heartbeat: Heartbeat) -> Result { + let heartbeat = serde_json::to_string_pretty(&heartbeat)?; + let heartbeat = sse::Event::default().data(heartbeat); + Ok(heartbeat) + } +} + +impl TryFrom for sse::KeepAlive { + type Error = >::Error; + + fn try_from(heartbeat: Heartbeat) -> Result { + let event = heartbeat.try_into()?; + let keep_alive = KeepAlive::new().interval(Heartbeat::INTERVAL).event(event); + Ok(keep_alive) + } +} diff --git a/src/event/routes/get.rs b/src/event/routes/get.rs index 2ca8991..f6c91fa 100644 --- a/src/event/routes/get.rs +++ b/src/event/routes/get.rs @@ -11,7 +11,7 @@ use futures::stream::{Stream, StreamExt as _}; use crate::{ app::App, error::{Internal, Unauthorized}, - event::{Event, Sequence, Sequenced as _, app, extract::LastEventId}, + event::{Event, Heartbeat::Heartbeat, Sequence, Sequenced as _, app, extract::LastEventId}, token::{app::ValidateError, extract::Identity}, }; @@ -44,9 +44,11 @@ where fn into_response(self) -> response::Response { let Self(stream) = self; let stream = stream.map(sse::Event::try_from); - Sse::new(stream) - .keep_alive(sse::KeepAlive::default()) - .into_response() + let heartbeat = match Heartbeat.try_into().map_err(Internal::from) { + Ok(heartbeat) => heartbeat, + Err(err) => return err.into_response(), + }; + Sse::new(stream).keep_alive(heartbeat).into_response() } } -- cgit v1.2.3 From 1ee129176eb71f5e246462b66fd9c9862ed1ee7a Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Tue, 8 Apr 2025 19:50:14 -0400 Subject: Restart the event connection if heartbeats stop showing up. The changes introduced in the previous commit make it possible to detect lost connections and restart them, so do so. The process is pretty simple - a new remote state is spun up using `/api/boot`, swapped in for the existing state, and a `new EventSource` is started from that new remote state to consume events. This can induce some anomalies. For example, messages that arrive on the server between the loss of one connection and the creation of the next one just "show up" in boot, without ever appearing in the event stream. (This is technically also true on client startup, but it's easier to expect in that situation.) This is something we'll need to consider when implementing things like notifications or unread flags, though the ones we have today, which are state-based, do work fine. By design, this _does not_ retry either the `/api/boot` call or the new event source setup. Event sources will try to reconnect on their own, up to a point, so that's fine, but we need to build something more robust for `/api/boot`. I want to tackle that separately from detecting lost connections and reacting to them, but that does mean that this is not a complete solution to client reconnects. --- ui/lib/session.svelte.js | 60 +++++++++++++++++++++++++++++++++---- ui/lib/state/remote/state.svelte.js | 8 +++-- ui/lib/watchdog.js | 27 +++++++++++++++++ 3 files changed, 87 insertions(+), 8 deletions(-) create mode 100644 ui/lib/watchdog.js diff --git a/ui/lib/session.svelte.js b/ui/lib/session.svelte.js index 67155ab..b953d9c 100644 --- a/ui/lib/session.svelte.js +++ b/ui/lib/session.svelte.js @@ -1,8 +1,11 @@ import { redirect } from '@sveltejs/kit'; +import { goto } from '$app/navigation'; + import * as api from './apiServer.js'; import * as r from './state/remote/state.svelte.js'; import * as l from './state/local/channels.svelte.js'; +import { Watchdog } from './watchdog.js'; class Session { remote = $state(); @@ -16,19 +19,32 @@ class Session { ) ); - static boot({ user, users, channels, messages, resume_point }) { + static boot({ user, users, channels, messages, resume_point, heartbeat }) { const remote = r.State.boot({ currentUser: user, users, channels, messages, - resumePoint: resume_point + resumePoint: resume_point, + heartbeat }); const local = l.Channels.fromLocalStorage(); return new Session(remote, local); } + reboot({ user, users, channels, messages, resume_point, heartbeat }) { + this.remote = r.State.boot({ + currentUser: user, + users, + channels, + messages, + resumePoint: resume_point, + heartbeat + }); + } + constructor(remote, local) { + this.watchdog = new Watchdog(this.watchdogExpired.bind(this)); this.remote = remote; this.local = local; } @@ -36,30 +52,62 @@ class Session { begin() { this.events = api.subscribeToEvents(this.remote.resumePoint); this.events.onmessage = this.onMessage.bind(this); + this.watchdog.reset(this.heartbeatMillis()); } end() { + this.watchdog.stop(); this.events.close(); this.events = null; } + active() { + return this.events !== null; + } + onMessage(message) { const event = JSON.parse(message.data); this.remote.onEvent(event); this.local.retainChannels(this.remote.channels.all.keys()); + this.watchdog.reset(this.heartbeatMillis()); + } + + heartbeatMillis() { + return this.remote.heartbeat /* in seconds */ * 1000 /* millis */; + } + + async watchdogExpired() { + // We leave `this.events` set here as a marker that the interruption is temporary. That's then + // used below, after a potential delay, to decide whether to start the stream back up again or + // not. + this.events.close(); + this.watchdog.stop(); + + const response = await bootOrNavigate(goto); + // Session abandoned; give up here. We need to do this after each await, because that's time in + // which the session may have been abandoned. + if (!this.active()) return; + + this.reboot(response); + this.begin(); } } -export async function boot() { +async function bootOrNavigate(navigateTo) { const response = await api.boot(); switch (response.status) { case 401: - redirect(307, '/login'); + await navigateTo('/login'); break; case 503: - redirect(307, '/setup'); + await navigateTo('/setup'); break; case 200: - return Session.boot(response.data); + return response.data; } } + +export async function boot() { + const response = await bootOrNavigate(async (url) => redirect(307, url)); + return Session.boot(response); +} diff --git a/ui/lib/state/remote/state.svelte.js b/ui/lib/state/remote/state.svelte.js index 6cbe124..29831a0 100644 --- a/ui/lib/state/remote/state.svelte.js +++ b/ui/lib/state/remote/state.svelte.js @@ -8,9 +8,10 @@ export class State { channels = $state(); messages = $state(); - static boot({ currentUser, users, channels, messages, resumePoint }) { + static boot({ currentUser, heartbeat, users, channels, messages, resumePoint }) { return new State({ currentUser, + heartbeat, users: Users.boot(users), channels: Channels.boot(channels), messages: Messages.boot(messages), @@ -18,8 +19,9 @@ export class State { }); } - constructor({ currentUser, users, channels, messages, resumePoint }) { + constructor({ currentUser, heartbeat, users, channels, messages, resumePoint }) { this.currentUser = currentUser; + this.heartbeat = heartbeat; this.users = users; this.channels = channels; this.messages = messages; @@ -27,6 +29,8 @@ export class State { } onEvent(event) { + // Heartbeats are actually completely ignored here. They're handled in `Session`, but not as a + // special case; _any_ event is a heartbeat event. switch (event.type) { case 'channel': return this.onChannelEvent(event); diff --git a/ui/lib/watchdog.js b/ui/lib/watchdog.js new file mode 100644 index 0000000..c95fd4d --- /dev/null +++ b/ui/lib/watchdog.js @@ -0,0 +1,27 @@ +export class Watchdog { + constructor(onExpired) { + this.timeout = null; + this.onExpired = onExpired; + } + + reset(delay) { + if (this.timeout !== null) { + clearTimeout(this.timeout); + } + this.timeout = setTimeout(this.expire.bind(this), delay); + } + + stop() { + if (this.timeout !== null) { + clearTimeout(this.timeout); + this.timeout = null; + } + } + + expire() { + if (this.timeout !== null) { + this.timeout = null; + } + this.onExpired(); + } +} -- cgit v1.2.3