diff options
| author | Owen Jacobson <owen@grimoire.ca> | 2025-04-08 19:40:32 -0400 |
|---|---|---|
| committer | Owen Jacobson <owen@grimoire.ca> | 2025-04-08 19:40:32 -0400 |
| commit | e2cdb46c3f6707c1b01f8827d8ba491469b5679f (patch) | |
| tree | 545e455d893fadc2530e88e68246596aaa877919 /src/event/routes | |
| parent | 0fc3057b05dddb4eba142deeb6373ed37e312c60 (diff) | |
Heartbeats are part of the event protocol.
A heartbeat is an event that the server synthesizes any time an event stream has been idle for longer than some timeout. They allow clients to detect disconnection and network problems, which would otherwise go unnoticed because event streams are a one-way channel. Most network problems only become clear when the offended party tries to _send_ something, and subscribing to an event stream only sends something during the request phase.
Technically, Pilcrow has always sent these, since we started using Axum's SSE support: it defaults to sending a dummy event after 15 seconds (consisting of `":\n\n"`, which is then ignored). I've built Pilcrow's heartbeat support out of that, by customizing the event sent back. The results _mostly_ look like existing events, but there are two key differences:
* Heartbeats don't have `id` fields in the event stream. They're synthetic, and they don't participate in either the "resume at" sequence management, or the last-event-id header-based resumption management.
* Heartbeats have an `event` but no `type` field in the message body. There are no subtypes.
To make it less likely that clients will race with the server on expiring timeouts, heartbeats are sent about five seconds early. In this change, heartbeats are due after 20 seconds, but are sent after 15. If it takes longer than five seconds for a heartbeat to arrive, a client can and should treat that as a network problem and reconnect, but I'd really like to avoid that happening over differences smaller than a second, so I've left a margin.
I originally sketched this out in conversation with @wlonk as having each event carry a deadline for the next one. I ultimately opted not to do that for a few reasons. First, Axum makes it hard - the built-in keep-alive support only works with a static event, and cannot make dynamic ones whose payloads might vary (for example if the deadline is variable). Second, it's complex, to no apparent gain, and adds deadline information to _every_ event type.
This implementation, instead, sends deadline information as part of boot, as a fixed interval in seconds. Clients are responsible for working out deadlines based on message arrivals. This is fine; heartbeat-based connection management is best effort at the best of times, so a few milliseconds of slop in either direction won't hurt anything.
The existing client ignores these events entirely, which is convenient.
The new heartbeat event type is defined alongside the main event type, to make it less likely that we'll inadvertently make changes to one but not the other. We can still do so advertently, I just don't want it to be an accident.
Diffstat (limited to 'src/event/routes')
| -rw-r--r-- | src/event/routes/get.rs | 10 |
1 files changed, 6 insertions, 4 deletions
diff --git a/src/event/routes/get.rs b/src/event/routes/get.rs index 2ca8991..f6c91fa 100644 --- a/src/event/routes/get.rs +++ b/src/event/routes/get.rs @@ -11,7 +11,7 @@ use futures::stream::{Stream, StreamExt as _}; use crate::{ app::App, error::{Internal, Unauthorized}, - event::{Event, Sequence, Sequenced as _, app, extract::LastEventId}, + event::{Event, Heartbeat::Heartbeat, Sequence, Sequenced as _, app, extract::LastEventId}, token::{app::ValidateError, extract::Identity}, }; @@ -44,9 +44,11 @@ where fn into_response(self) -> response::Response { let Self(stream) = self; let stream = stream.map(sse::Event::try_from); - Sse::new(stream) - .keep_alive(sse::KeepAlive::default()) - .into_response() + let heartbeat = match Heartbeat.try_into().map_err(Internal::from) { + Ok(heartbeat) => heartbeat, + Err(err) => return err.into_response(), + }; + Sse::new(stream).keep_alive(heartbeat).into_response() } } |
