summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/api/boot.md2
-rw-r--r--docs/api/events.md25
-rw-r--r--src/boot/app.rs4
-rw-r--r--src/boot/mod.rs15
-rw-r--r--src/event/mod.rs45
-rw-r--r--src/event/routes/get.rs10
6 files changed, 89 insertions, 12 deletions
diff --git a/docs/api/boot.md b/docs/api/boot.md
index 0c2dc08..46b972f 100644
--- a/docs/api/boot.md
+++ b/docs/api/boot.md
@@ -42,6 +42,7 @@ This endpoint will respond with a status of
"id": "U1234abcd"
},
"resume_point": 1312,
+ "heartbeat": 30,
"users": [
{
"id": "U1234abcd",
@@ -72,6 +73,7 @@ The response will include the following fields:
|:---------------|:----------------|:-------------------------------------------------------------------------------------------------------------------------|
| `user` | object | The details of the caller's identity. |
| `resume_point` | integer | A resume point for [events](./events.md), such that the event stream will begin immediately after the included snapshot. |
+| `heartbeat` | integer | The [heartbeat timeout](./events.md#heartbeat-events), in seconds, for events. |
| `users` | array of object | A snapshot of the users present in the service. |
| `channels` | array of object | A snapshot of the channels present in the service. |
| `messages` | array of object | A snapshot of the messages present in the service. |
diff --git a/docs/api/events.md b/docs/api/events.md
index 3347a26..7fc7d78 100644
--- a/docs/api/events.md
+++ b/docs/api/events.md
@@ -86,12 +86,27 @@ The service may terminate the connection at any time. Clients should reconnect a
Each event's `data` consists of a JSON object describing one event. Every event includes the following fields:
-| Field | Type | Description |
-|:--------|:-------|:-------------------------------------------------------------------------------------------------------------|
-| `type` | string | The type of entity the event describes. Will be one of the types listed in the next section. |
-| `event` | string | The specific kind of event. Will be one of the events listed with the associated `type` in the next section. |
+| Field | Type | Description |
+|:--------|:-----------------|:-------------------------------------------------------------------------------------------------------------|
+| `type` | string | The type of entity the event describes. Will be one of the types listed in the next section. |
+| `event` | string, optional | The specific kind of event. Will be one of the events listed with the associated `type` in the next section. |
-The remaining fields depend on the `type` and `event` field.
+The remaining fields depend on the `type` and (if present) the `event` field.
+
+
+## Heartbeat events
+
+```json
+{
+ "type": "heartbeat"
+}
+```
+
+To help clients detect network interruptions, the service guarantees that it will deliver an event after a fixed interval called the "heartbeat interval." The specific interval length is given in seconds as part of the [boot response](./boot.md). If the service determines that the heartbeat interval is close to expiring, it will synthesize and deliver a heartbeat event.
+
+Clients should treat any period of time without events, longer than the heartbeat interval, as an indication that the event stream may have been interrupted. Clients may also use other techniques, such as [browser APIs](https://developer.mozilla.org/en-US/docs/Web/API/EventSource/error_event), to detect this condition and restart the connection.
+
+These events have the `type` field set to `"heartbeat"`. The `event` field is absent.
## User events
diff --git a/src/boot/app.rs b/src/boot/app.rs
index f531afe..cd45c38 100644
--- a/src/boot/app.rs
+++ b/src/boot/app.rs
@@ -3,7 +3,7 @@ use sqlx::sqlite::SqlitePool;
use super::Snapshot;
use crate::{
channel::{self, repo::Provider as _},
- event::repo::Provider as _,
+ event::{Heartbeat, repo::Provider as _},
message::repo::Provider as _,
name,
user::{self, repo::Provider as _},
@@ -21,6 +21,7 @@ impl<'a> Boot<'a> {
pub async fn snapshot(&self) -> Result<Snapshot, Error> {
let mut tx = self.db.begin().await?;
let resume_point = tx.sequence().current().await?;
+ let heartbeat = Heartbeat::TIMEOUT;
let users = tx.users().all(resume_point).await?;
let channels = tx.channels().all(resume_point).await?;
@@ -45,6 +46,7 @@ impl<'a> Boot<'a> {
Ok(Snapshot {
resume_point,
+ heartbeat,
users,
channels,
messages,
diff --git a/src/boot/mod.rs b/src/boot/mod.rs
index c52b088..122bd53 100644
--- a/src/boot/mod.rs
+++ b/src/boot/mod.rs
@@ -1,14 +1,25 @@
+use crate::{channel::Channel, event::Sequence, message::Message, user::User};
+use serde::Serialize;
+use std::time::Duration;
+
pub mod app;
mod routes;
-use crate::{channel::Channel, event::Sequence, message::Message, user::User};
-
pub use self::routes::router;
#[derive(serde::Serialize)]
pub struct Snapshot {
pub resume_point: Sequence,
+ #[serde(serialize_with = "as_seconds")]
+ pub heartbeat: Duration,
pub users: Vec<User>,
pub channels: Vec<Channel>,
pub messages: Vec<Message>,
}
+
+fn as_seconds<S>(duration: &Duration, serializer: S) -> Result<S::Ok, S::Error>
+where
+ S: serde::Serializer,
+{
+ duration.as_secs().serialize(serializer)
+}
diff --git a/src/event/mod.rs b/src/event/mod.rs
index 3ab88ec..1f2ec42 100644
--- a/src/event/mod.rs
+++ b/src/event/mod.rs
@@ -1,4 +1,7 @@
use crate::{channel, message, user};
+use axum::response::sse;
+use axum::response::sse::KeepAlive;
+use std::time::Duration;
pub mod app;
mod broadcaster;
@@ -21,6 +24,16 @@ pub enum Event {
Message(message::Event),
}
+// Serialized representation is intended to look like the serialized representation of `Event`,
+// above - though heartbeat events contain only a type field and none of the other event gubbins.
+// They don't have to participate in sequence numbering, aren't generated from stored data, and
+// generally Are Weird.
+#[derive(serde::Serialize)]
+#[serde(tag = "type", rename_all = "snake_case")]
+pub enum Heartbeat {
+ Heartbeat,
+}
+
impl Sequenced for Event {
fn instant(&self) -> Instant {
match self {
@@ -48,3 +61,35 @@ impl From<message::Event> for Event {
Self::Message(event)
}
}
+
+impl Heartbeat {
+ // The following values are a first-rough-guess attempt to balance noticing connection problems
+ // quickly with managing the (modest) costs of delivering and processing heartbeats. Feel
+ // encouraged to tune them if you have a better idea on how to set them!
+
+ // Advise clients to expect heartbeats this often
+ pub const TIMEOUT: Duration = Duration::from_secs(20);
+ // Actually send heartbeats this often; this is shorter to allow time for the heartbeat to
+ // arrive before the advised deadline.
+ pub const INTERVAL: Duration = Duration::from_secs(15);
+}
+
+impl TryFrom<Heartbeat> for sse::Event {
+ type Error = serde_json::Error;
+
+ fn try_from(heartbeat: Heartbeat) -> Result<sse::Event, Self::Error> {
+ let heartbeat = serde_json::to_string_pretty(&heartbeat)?;
+ let heartbeat = sse::Event::default().data(heartbeat);
+ Ok(heartbeat)
+ }
+}
+
+impl TryFrom<Heartbeat> for sse::KeepAlive {
+ type Error = <sse::Event as TryFrom<Heartbeat>>::Error;
+
+ fn try_from(heartbeat: Heartbeat) -> Result<sse::KeepAlive, Self::Error> {
+ let event = heartbeat.try_into()?;
+ let keep_alive = KeepAlive::new().interval(Heartbeat::INTERVAL).event(event);
+ Ok(keep_alive)
+ }
+}
diff --git a/src/event/routes/get.rs b/src/event/routes/get.rs
index 2ca8991..f6c91fa 100644
--- a/src/event/routes/get.rs
+++ b/src/event/routes/get.rs
@@ -11,7 +11,7 @@ use futures::stream::{Stream, StreamExt as _};
use crate::{
app::App,
error::{Internal, Unauthorized},
- event::{Event, Sequence, Sequenced as _, app, extract::LastEventId},
+ event::{Event, Heartbeat::Heartbeat, Sequence, Sequenced as _, app, extract::LastEventId},
token::{app::ValidateError, extract::Identity},
};
@@ -44,9 +44,11 @@ where
fn into_response(self) -> response::Response {
let Self(stream) = self;
let stream = stream.map(sse::Event::try_from);
- Sse::new(stream)
- .keep_alive(sse::KeepAlive::default())
- .into_response()
+ let heartbeat = match Heartbeat.try_into().map_err(Internal::from) {
+ Ok(heartbeat) => heartbeat,
+ Err(err) => return err.into_response(),
+ };
+ Sse::new(stream).keep_alive(heartbeat).into_response()
}
}