From 1ee129176eb71f5e246462b66fd9c9862ed1ee7a Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Tue, 8 Apr 2025 19:50:14 -0400 Subject: Restart the event connection if heartbeats stop showing up. The changes introduced in the previous commit make it possible to detect lost connections and restart them, so do so. The process is pretty simple - a new remote state is spun up using `/api/boot`, swapped in for the existing state, and a `new EventSource` is started from that new remote state to consume events. This can induce some anomalies. For example, messages that arrive on the server between the loss of one connection and the creation of the next one just "show up" in boot, without ever appearing in the event stream. (This is technically also true on client startup, but it's easier to expect in that situation.) This is something we'll need to consider when implementing things like notifications or unread flags, though the ones we have today, which are state-based, do work fine. By design, this _does not_ retry either the `/api/boot` call or the new event source setup. Event sources will try to reconnect on their own, up to a point, so that's fine, but we need to build something more robust for `/api/boot`. I want to tackle that separately from detecting lost connections and reacting to them, but that does mean that this is not a complete solution to client reconnects. --- ui/lib/session.svelte.js | 60 +++++++++++++++++++++++++++++++++---- ui/lib/state/remote/state.svelte.js | 8 +++-- ui/lib/watchdog.js | 27 +++++++++++++++++ 3 files changed, 87 insertions(+), 8 deletions(-) create mode 100644 ui/lib/watchdog.js (limited to 'ui/lib') diff --git a/ui/lib/session.svelte.js b/ui/lib/session.svelte.js index 67155ab..b953d9c 100644 --- a/ui/lib/session.svelte.js +++ b/ui/lib/session.svelte.js @@ -1,8 +1,11 @@ import { redirect } from '@sveltejs/kit'; +import { goto } from '$app/navigation'; + import * as api from './apiServer.js'; import * as r from './state/remote/state.svelte.js'; import * as l from './state/local/channels.svelte.js'; +import { Watchdog } from './watchdog.js'; class Session { remote = $state(); @@ -16,19 +19,32 @@ class Session { ) ); - static boot({ user, users, channels, messages, resume_point }) { + static boot({ user, users, channels, messages, resume_point, heartbeat }) { const remote = r.State.boot({ currentUser: user, users, channels, messages, - resumePoint: resume_point + resumePoint: resume_point, + heartbeat }); const local = l.Channels.fromLocalStorage(); return new Session(remote, local); } + reboot({ user, users, channels, messages, resume_point, heartbeat }) { + this.remote = r.State.boot({ + currentUser: user, + users, + channels, + messages, + resumePoint: resume_point, + heartbeat + }); + } + constructor(remote, local) { + this.watchdog = new Watchdog(this.watchdogExpired.bind(this)); this.remote = remote; this.local = local; } @@ -36,30 +52,62 @@ class Session { begin() { this.events = api.subscribeToEvents(this.remote.resumePoint); this.events.onmessage = this.onMessage.bind(this); + this.watchdog.reset(this.heartbeatMillis()); } end() { + this.watchdog.stop(); this.events.close(); this.events = null; } + active() { + return this.events !== null; + } + onMessage(message) { const event = JSON.parse(message.data); this.remote.onEvent(event); this.local.retainChannels(this.remote.channels.all.keys()); + this.watchdog.reset(this.heartbeatMillis()); + } + + heartbeatMillis() { + return this.remote.heartbeat /* in seconds */ * 1000 /* millis */; + } + + async watchdogExpired() { + // We leave `this.events` set here as a marker that the interruption is temporary. That's then + // used below, after a potential delay, to decide whether to start the stream back up again or + // not. + this.events.close(); + this.watchdog.stop(); + + const response = await bootOrNavigate(goto); + // Session abandoned; give up here. We need to do this after each await, because that's time in + // which the session may have been abandoned. + if (!this.active()) return; + + this.reboot(response); + this.begin(); } } -export async function boot() { +async function bootOrNavigate(navigateTo) { const response = await api.boot(); switch (response.status) { case 401: - redirect(307, '/login'); + await navigateTo('/login'); break; case 503: - redirect(307, '/setup'); + await navigateTo('/setup'); break; case 200: - return Session.boot(response.data); + return response.data; } } + +export async function boot() { + const response = await bootOrNavigate(async (url) => redirect(307, url)); + return Session.boot(response); +} diff --git a/ui/lib/state/remote/state.svelte.js b/ui/lib/state/remote/state.svelte.js index 6cbe124..29831a0 100644 --- a/ui/lib/state/remote/state.svelte.js +++ b/ui/lib/state/remote/state.svelte.js @@ -8,9 +8,10 @@ export class State { channels = $state(); messages = $state(); - static boot({ currentUser, users, channels, messages, resumePoint }) { + static boot({ currentUser, heartbeat, users, channels, messages, resumePoint }) { return new State({ currentUser, + heartbeat, users: Users.boot(users), channels: Channels.boot(channels), messages: Messages.boot(messages), @@ -18,8 +19,9 @@ export class State { }); } - constructor({ currentUser, users, channels, messages, resumePoint }) { + constructor({ currentUser, heartbeat, users, channels, messages, resumePoint }) { this.currentUser = currentUser; + this.heartbeat = heartbeat; this.users = users; this.channels = channels; this.messages = messages; @@ -27,6 +29,8 @@ export class State { } onEvent(event) { + // Heartbeats are actually completely ignored here. They're handled in `Session`, but not as a + // special case; _any_ event is a heartbeat event. switch (event.type) { case 'channel': return this.onChannelEvent(event); diff --git a/ui/lib/watchdog.js b/ui/lib/watchdog.js new file mode 100644 index 0000000..c95fd4d --- /dev/null +++ b/ui/lib/watchdog.js @@ -0,0 +1,27 @@ +export class Watchdog { + constructor(onExpired) { + this.timeout = null; + this.onExpired = onExpired; + } + + reset(delay) { + if (this.timeout !== null) { + clearTimeout(this.timeout); + } + this.timeout = setTimeout(this.expire.bind(this), delay); + } + + stop() { + if (this.timeout !== null) { + clearTimeout(this.timeout); + this.timeout = null; + } + } + + expire() { + if (this.timeout !== null) { + this.timeout = null; + } + this.onExpired(); + } +} -- cgit v1.2.3