summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorOwen Jacobson <owen@grimoire.ca>2025-04-08 19:50:14 -0400
committerOwen Jacobson <owen@grimoire.ca>2025-04-08 19:50:14 -0400
commit1ee129176eb71f5e246462b66fd9c9862ed1ee7a (patch)
tree9874d3d61f0bdb13913c6c4d079fbb82b336f656
parente2cdb46c3f6707c1b01f8827d8ba491469b5679f (diff)
Restart the event connection if heartbeats stop showing up.
The changes introduced in the previous commit make it possible to detect lost connections and restart them, so do so. The process is pretty simple - a new remote state is spun up using `/api/boot`, swapped in for the existing state, and a `new EventSource` is started from that new remote state to consume events. This can induce some anomalies. For example, messages that arrive on the server between the loss of one connection and the creation of the next one just "show up" in boot, without ever appearing in the event stream. (This is technically also true on client startup, but it's easier to expect in that situation.) This is something we'll need to consider when implementing things like notifications or unread flags, though the ones we have today, which are state-based, do work fine. By design, this _does not_ retry either the `/api/boot` call or the new event source setup. Event sources will try to reconnect on their own, up to a point, so that's fine, but we need to build something more robust for `/api/boot`. I want to tackle that separately from detecting lost connections and reacting to them, but that does mean that this is not a complete solution to client reconnects.
-rw-r--r--ui/lib/session.svelte.js60
-rw-r--r--ui/lib/state/remote/state.svelte.js8
-rw-r--r--ui/lib/watchdog.js27
3 files changed, 87 insertions, 8 deletions
diff --git a/ui/lib/session.svelte.js b/ui/lib/session.svelte.js
index 67155ab..b953d9c 100644
--- a/ui/lib/session.svelte.js
+++ b/ui/lib/session.svelte.js
@@ -1,8 +1,11 @@
import { redirect } from '@sveltejs/kit';
+import { goto } from '$app/navigation';
+
import * as api from './apiServer.js';
import * as r from './state/remote/state.svelte.js';
import * as l from './state/local/channels.svelte.js';
+import { Watchdog } from './watchdog.js';
class Session {
remote = $state();
@@ -16,19 +19,32 @@ class Session {
)
);
- static boot({ user, users, channels, messages, resume_point }) {
+ static boot({ user, users, channels, messages, resume_point, heartbeat }) {
const remote = r.State.boot({
currentUser: user,
users,
channels,
messages,
- resumePoint: resume_point
+ resumePoint: resume_point,
+ heartbeat
});
const local = l.Channels.fromLocalStorage();
return new Session(remote, local);
}
+ reboot({ user, users, channels, messages, resume_point, heartbeat }) {
+ this.remote = r.State.boot({
+ currentUser: user,
+ users,
+ channels,
+ messages,
+ resumePoint: resume_point,
+ heartbeat
+ });
+ }
+
constructor(remote, local) {
+ this.watchdog = new Watchdog(this.watchdogExpired.bind(this));
this.remote = remote;
this.local = local;
}
@@ -36,30 +52,62 @@ class Session {
begin() {
this.events = api.subscribeToEvents(this.remote.resumePoint);
this.events.onmessage = this.onMessage.bind(this);
+ this.watchdog.reset(this.heartbeatMillis());
}
end() {
+ this.watchdog.stop();
this.events.close();
this.events = null;
}
+ active() {
+ return this.events !== null;
+ }
+
onMessage(message) {
const event = JSON.parse(message.data);
this.remote.onEvent(event);
this.local.retainChannels(this.remote.channels.all.keys());
+ this.watchdog.reset(this.heartbeatMillis());
+ }
+
+ heartbeatMillis() {
+ return this.remote.heartbeat /* in seconds */ * 1000 /* millis */;
+ }
+
+ async watchdogExpired() {
+ // We leave `this.events` set here as a marker that the interruption is temporary. That's then
+ // used below, after a potential delay, to decide whether to start the stream back up again or
+ // not.
+ this.events.close();
+ this.watchdog.stop();
+
+ const response = await bootOrNavigate(goto);
+ // Session abandoned; give up here. We need to do this after each await, because that's time in
+ // which the session may have been abandoned.
+ if (!this.active()) return;
+
+ this.reboot(response);
+ this.begin();
}
}
-export async function boot() {
+async function bootOrNavigate(navigateTo) {
const response = await api.boot();
switch (response.status) {
case 401:
- redirect(307, '/login');
+ await navigateTo('/login');
break;
case 503:
- redirect(307, '/setup');
+ await navigateTo('/setup');
break;
case 200:
- return Session.boot(response.data);
+ return response.data;
}
}
+
+export async function boot() {
+ const response = await bootOrNavigate(async (url) => redirect(307, url));
+ return Session.boot(response);
+}
diff --git a/ui/lib/state/remote/state.svelte.js b/ui/lib/state/remote/state.svelte.js
index 6cbe124..29831a0 100644
--- a/ui/lib/state/remote/state.svelte.js
+++ b/ui/lib/state/remote/state.svelte.js
@@ -8,9 +8,10 @@ export class State {
channels = $state();
messages = $state();
- static boot({ currentUser, users, channels, messages, resumePoint }) {
+ static boot({ currentUser, heartbeat, users, channels, messages, resumePoint }) {
return new State({
currentUser,
+ heartbeat,
users: Users.boot(users),
channels: Channels.boot(channels),
messages: Messages.boot(messages),
@@ -18,8 +19,9 @@ export class State {
});
}
- constructor({ currentUser, users, channels, messages, resumePoint }) {
+ constructor({ currentUser, heartbeat, users, channels, messages, resumePoint }) {
this.currentUser = currentUser;
+ this.heartbeat = heartbeat;
this.users = users;
this.channels = channels;
this.messages = messages;
@@ -27,6 +29,8 @@ export class State {
}
onEvent(event) {
+ // Heartbeats are actually completely ignored here. They're handled in `Session`, but not as a
+ // special case; _any_ event is a heartbeat event.
switch (event.type) {
case 'channel':
return this.onChannelEvent(event);
diff --git a/ui/lib/watchdog.js b/ui/lib/watchdog.js
new file mode 100644
index 0000000..c95fd4d
--- /dev/null
+++ b/ui/lib/watchdog.js
@@ -0,0 +1,27 @@
+export class Watchdog {
+ constructor(onExpired) {
+ this.timeout = null;
+ this.onExpired = onExpired;
+ }
+
+ reset(delay) {
+ if (this.timeout !== null) {
+ clearTimeout(this.timeout);
+ }
+ this.timeout = setTimeout(this.expire.bind(this), delay);
+ }
+
+ stop() {
+ if (this.timeout !== null) {
+ clearTimeout(this.timeout);
+ this.timeout = null;
+ }
+ }
+
+ expire() {
+ if (this.timeout !== null) {
+ this.timeout = null;
+ }
+ this.onExpired();
+ }
+}