diff options
| author | Owen Jacobson <owen@grimoire.ca> | 2025-04-10 20:50:13 -0400 |
|---|---|---|
| committer | Owen Jacobson <owen@grimoire.ca> | 2025-04-10 20:50:13 -0400 |
| commit | 1ef57107b1c355ef896327f0714344277df7ae18 (patch) | |
| tree | 9874d3d61f0bdb13913c6c4d079fbb82b336f656 | |
| parent | 0fc3057b05dddb4eba142deeb6373ed37e312c60 (diff) | |
| parent | 1ee129176eb71f5e246462b66fd9c9862ed1ee7a (diff) | |
Use a heartbeat to allow the client to reconnect after network failures.
| -rw-r--r-- | docs/api/boot.md | 2 | ||||
| -rw-r--r-- | docs/api/events.md | 25 | ||||
| -rw-r--r-- | src/boot/app.rs | 4 | ||||
| -rw-r--r-- | src/boot/mod.rs | 15 | ||||
| -rw-r--r-- | src/event/mod.rs | 45 | ||||
| -rw-r--r-- | src/event/routes/get.rs | 10 | ||||
| -rw-r--r-- | ui/lib/session.svelte.js | 60 | ||||
| -rw-r--r-- | ui/lib/state/remote/state.svelte.js | 8 | ||||
| -rw-r--r-- | ui/lib/watchdog.js | 27 |
9 files changed, 176 insertions, 20 deletions
diff --git a/docs/api/boot.md b/docs/api/boot.md index 0c2dc08..46b972f 100644 --- a/docs/api/boot.md +++ b/docs/api/boot.md @@ -42,6 +42,7 @@ This endpoint will respond with a status of "id": "U1234abcd" }, "resume_point": 1312, + "heartbeat": 30, "users": [ { "id": "U1234abcd", @@ -72,6 +73,7 @@ The response will include the following fields: |:---------------|:----------------|:-------------------------------------------------------------------------------------------------------------------------| | `user` | object | The details of the caller's identity. | | `resume_point` | integer | A resume point for [events](./events.md), such that the event stream will begin immediately after the included snapshot. | +| `heartbeat` | integer | The [heartbeat timeout](./events.md#heartbeat-events), in seconds, for events. | | `users` | array of object | A snapshot of the users present in the service. | | `channels` | array of object | A snapshot of the channels present in the service. | | `messages` | array of object | A snapshot of the messages present in the service. | diff --git a/docs/api/events.md b/docs/api/events.md index 3347a26..7fc7d78 100644 --- a/docs/api/events.md +++ b/docs/api/events.md @@ -86,12 +86,27 @@ The service may terminate the connection at any time. Clients should reconnect a Each event's `data` consists of a JSON object describing one event. Every event includes the following fields: -| Field | Type | Description | -|:--------|:-------|:-------------------------------------------------------------------------------------------------------------| -| `type` | string | The type of entity the event describes. Will be one of the types listed in the next section. | -| `event` | string | The specific kind of event. Will be one of the events listed with the associated `type` in the next section. | +| Field | Type | Description | +|:--------|:-----------------|:-------------------------------------------------------------------------------------------------------------| +| `type` | string | The type of entity the event describes. Will be one of the types listed in the next section. | +| `event` | string, optional | The specific kind of event. Will be one of the events listed with the associated `type` in the next section. | -The remaining fields depend on the `type` and `event` field. +The remaining fields depend on the `type` and (if present) the `event` field. + + +## Heartbeat events + +```json +{ + "type": "heartbeat" +} +``` + +To help clients detect network interruptions, the service guarantees that it will deliver an event after a fixed interval called the "heartbeat interval." The specific interval length is given in seconds as part of the [boot response](./boot.md). If the service determines that the heartbeat interval is close to expiring, it will synthesize and deliver a heartbeat event. + +Clients should treat any period of time without events, longer than the heartbeat interval, as an indication that the event stream may have been interrupted. Clients may also use other techniques, such as [browser APIs](https://developer.mozilla.org/en-US/docs/Web/API/EventSource/error_event), to detect this condition and restart the connection. + +These events have the `type` field set to `"heartbeat"`. The `event` field is absent. ## User events diff --git a/src/boot/app.rs b/src/boot/app.rs index f531afe..cd45c38 100644 --- a/src/boot/app.rs +++ b/src/boot/app.rs @@ -3,7 +3,7 @@ use sqlx::sqlite::SqlitePool; use super::Snapshot; use crate::{ channel::{self, repo::Provider as _}, - event::repo::Provider as _, + event::{Heartbeat, repo::Provider as _}, message::repo::Provider as _, name, user::{self, repo::Provider as _}, @@ -21,6 +21,7 @@ impl<'a> Boot<'a> { pub async fn snapshot(&self) -> Result<Snapshot, Error> { let mut tx = self.db.begin().await?; let resume_point = tx.sequence().current().await?; + let heartbeat = Heartbeat::TIMEOUT; let users = tx.users().all(resume_point).await?; let channels = tx.channels().all(resume_point).await?; @@ -45,6 +46,7 @@ impl<'a> Boot<'a> { Ok(Snapshot { resume_point, + heartbeat, users, channels, messages, diff --git a/src/boot/mod.rs b/src/boot/mod.rs index c52b088..122bd53 100644 --- a/src/boot/mod.rs +++ b/src/boot/mod.rs @@ -1,14 +1,25 @@ +use crate::{channel::Channel, event::Sequence, message::Message, user::User}; +use serde::Serialize; +use std::time::Duration; + pub mod app; mod routes; -use crate::{channel::Channel, event::Sequence, message::Message, user::User}; - pub use self::routes::router; #[derive(serde::Serialize)] pub struct Snapshot { pub resume_point: Sequence, + #[serde(serialize_with = "as_seconds")] + pub heartbeat: Duration, pub users: Vec<User>, pub channels: Vec<Channel>, pub messages: Vec<Message>, } + +fn as_seconds<S>(duration: &Duration, serializer: S) -> Result<S::Ok, S::Error> +where + S: serde::Serializer, +{ + duration.as_secs().serialize(serializer) +} diff --git a/src/event/mod.rs b/src/event/mod.rs index 3ab88ec..1f2ec42 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -1,4 +1,7 @@ use crate::{channel, message, user}; +use axum::response::sse; +use axum::response::sse::KeepAlive; +use std::time::Duration; pub mod app; mod broadcaster; @@ -21,6 +24,16 @@ pub enum Event { Message(message::Event), } +// Serialized representation is intended to look like the serialized representation of `Event`, +// above - though heartbeat events contain only a type field and none of the other event gubbins. +// They don't have to participate in sequence numbering, aren't generated from stored data, and +// generally Are Weird. +#[derive(serde::Serialize)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum Heartbeat { + Heartbeat, +} + impl Sequenced for Event { fn instant(&self) -> Instant { match self { @@ -48,3 +61,35 @@ impl From<message::Event> for Event { Self::Message(event) } } + +impl Heartbeat { + // The following values are a first-rough-guess attempt to balance noticing connection problems + // quickly with managing the (modest) costs of delivering and processing heartbeats. Feel + // encouraged to tune them if you have a better idea on how to set them! + + // Advise clients to expect heartbeats this often + pub const TIMEOUT: Duration = Duration::from_secs(20); + // Actually send heartbeats this often; this is shorter to allow time for the heartbeat to + // arrive before the advised deadline. + pub const INTERVAL: Duration = Duration::from_secs(15); +} + +impl TryFrom<Heartbeat> for sse::Event { + type Error = serde_json::Error; + + fn try_from(heartbeat: Heartbeat) -> Result<sse::Event, Self::Error> { + let heartbeat = serde_json::to_string_pretty(&heartbeat)?; + let heartbeat = sse::Event::default().data(heartbeat); + Ok(heartbeat) + } +} + +impl TryFrom<Heartbeat> for sse::KeepAlive { + type Error = <sse::Event as TryFrom<Heartbeat>>::Error; + + fn try_from(heartbeat: Heartbeat) -> Result<sse::KeepAlive, Self::Error> { + let event = heartbeat.try_into()?; + let keep_alive = KeepAlive::new().interval(Heartbeat::INTERVAL).event(event); + Ok(keep_alive) + } +} diff --git a/src/event/routes/get.rs b/src/event/routes/get.rs index 2ca8991..f6c91fa 100644 --- a/src/event/routes/get.rs +++ b/src/event/routes/get.rs @@ -11,7 +11,7 @@ use futures::stream::{Stream, StreamExt as _}; use crate::{ app::App, error::{Internal, Unauthorized}, - event::{Event, Sequence, Sequenced as _, app, extract::LastEventId}, + event::{Event, Heartbeat::Heartbeat, Sequence, Sequenced as _, app, extract::LastEventId}, token::{app::ValidateError, extract::Identity}, }; @@ -44,9 +44,11 @@ where fn into_response(self) -> response::Response { let Self(stream) = self; let stream = stream.map(sse::Event::try_from); - Sse::new(stream) - .keep_alive(sse::KeepAlive::default()) - .into_response() + let heartbeat = match Heartbeat.try_into().map_err(Internal::from) { + Ok(heartbeat) => heartbeat, + Err(err) => return err.into_response(), + }; + Sse::new(stream).keep_alive(heartbeat).into_response() } } diff --git a/ui/lib/session.svelte.js b/ui/lib/session.svelte.js index 67155ab..b953d9c 100644 --- a/ui/lib/session.svelte.js +++ b/ui/lib/session.svelte.js @@ -1,8 +1,11 @@ import { redirect } from '@sveltejs/kit'; +import { goto } from '$app/navigation'; + import * as api from './apiServer.js'; import * as r from './state/remote/state.svelte.js'; import * as l from './state/local/channels.svelte.js'; +import { Watchdog } from './watchdog.js'; class Session { remote = $state(); @@ -16,19 +19,32 @@ class Session { ) ); - static boot({ user, users, channels, messages, resume_point }) { + static boot({ user, users, channels, messages, resume_point, heartbeat }) { const remote = r.State.boot({ currentUser: user, users, channels, messages, - resumePoint: resume_point + resumePoint: resume_point, + heartbeat }); const local = l.Channels.fromLocalStorage(); return new Session(remote, local); } + reboot({ user, users, channels, messages, resume_point, heartbeat }) { + this.remote = r.State.boot({ + currentUser: user, + users, + channels, + messages, + resumePoint: resume_point, + heartbeat + }); + } + constructor(remote, local) { + this.watchdog = new Watchdog(this.watchdogExpired.bind(this)); this.remote = remote; this.local = local; } @@ -36,30 +52,62 @@ class Session { begin() { this.events = api.subscribeToEvents(this.remote.resumePoint); this.events.onmessage = this.onMessage.bind(this); + this.watchdog.reset(this.heartbeatMillis()); } end() { + this.watchdog.stop(); this.events.close(); this.events = null; } + active() { + return this.events !== null; + } + onMessage(message) { const event = JSON.parse(message.data); this.remote.onEvent(event); this.local.retainChannels(this.remote.channels.all.keys()); + this.watchdog.reset(this.heartbeatMillis()); + } + + heartbeatMillis() { + return this.remote.heartbeat /* in seconds */ * 1000 /* millis */; + } + + async watchdogExpired() { + // We leave `this.events` set here as a marker that the interruption is temporary. That's then + // used below, after a potential delay, to decide whether to start the stream back up again or + // not. + this.events.close(); + this.watchdog.stop(); + + const response = await bootOrNavigate(goto); + // Session abandoned; give up here. We need to do this after each await, because that's time in + // which the session may have been abandoned. + if (!this.active()) return; + + this.reboot(response); + this.begin(); } } -export async function boot() { +async function bootOrNavigate(navigateTo) { const response = await api.boot(); switch (response.status) { case 401: - redirect(307, '/login'); + await navigateTo('/login'); break; case 503: - redirect(307, '/setup'); + await navigateTo('/setup'); break; case 200: - return Session.boot(response.data); + return response.data; } } + +export async function boot() { + const response = await bootOrNavigate(async (url) => redirect(307, url)); + return Session.boot(response); +} diff --git a/ui/lib/state/remote/state.svelte.js b/ui/lib/state/remote/state.svelte.js index 6cbe124..29831a0 100644 --- a/ui/lib/state/remote/state.svelte.js +++ b/ui/lib/state/remote/state.svelte.js @@ -8,9 +8,10 @@ export class State { channels = $state(); messages = $state(); - static boot({ currentUser, users, channels, messages, resumePoint }) { + static boot({ currentUser, heartbeat, users, channels, messages, resumePoint }) { return new State({ currentUser, + heartbeat, users: Users.boot(users), channels: Channels.boot(channels), messages: Messages.boot(messages), @@ -18,8 +19,9 @@ export class State { }); } - constructor({ currentUser, users, channels, messages, resumePoint }) { + constructor({ currentUser, heartbeat, users, channels, messages, resumePoint }) { this.currentUser = currentUser; + this.heartbeat = heartbeat; this.users = users; this.channels = channels; this.messages = messages; @@ -27,6 +29,8 @@ export class State { } onEvent(event) { + // Heartbeats are actually completely ignored here. They're handled in `Session`, but not as a + // special case; _any_ event is a heartbeat event. switch (event.type) { case 'channel': return this.onChannelEvent(event); diff --git a/ui/lib/watchdog.js b/ui/lib/watchdog.js new file mode 100644 index 0000000..c95fd4d --- /dev/null +++ b/ui/lib/watchdog.js @@ -0,0 +1,27 @@ +export class Watchdog { + constructor(onExpired) { + this.timeout = null; + this.onExpired = onExpired; + } + + reset(delay) { + if (this.timeout !== null) { + clearTimeout(this.timeout); + } + this.timeout = setTimeout(this.expire.bind(this), delay); + } + + stop() { + if (this.timeout !== null) { + clearTimeout(this.timeout); + this.timeout = null; + } + } + + expire() { + if (this.timeout !== null) { + this.timeout = null; + } + this.onExpired(); + } +} |
