summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/boot/app.rs4
-rw-r--r--src/boot/mod.rs15
-rw-r--r--src/event/mod.rs45
-rw-r--r--src/event/routes/get.rs10
4 files changed, 67 insertions, 7 deletions
diff --git a/src/boot/app.rs b/src/boot/app.rs
index f531afe..cd45c38 100644
--- a/src/boot/app.rs
+++ b/src/boot/app.rs
@@ -3,7 +3,7 @@ use sqlx::sqlite::SqlitePool;
use super::Snapshot;
use crate::{
channel::{self, repo::Provider as _},
- event::repo::Provider as _,
+ event::{Heartbeat, repo::Provider as _},
message::repo::Provider as _,
name,
user::{self, repo::Provider as _},
@@ -21,6 +21,7 @@ impl<'a> Boot<'a> {
pub async fn snapshot(&self) -> Result<Snapshot, Error> {
let mut tx = self.db.begin().await?;
let resume_point = tx.sequence().current().await?;
+ let heartbeat = Heartbeat::TIMEOUT;
let users = tx.users().all(resume_point).await?;
let channels = tx.channels().all(resume_point).await?;
@@ -45,6 +46,7 @@ impl<'a> Boot<'a> {
Ok(Snapshot {
resume_point,
+ heartbeat,
users,
channels,
messages,
diff --git a/src/boot/mod.rs b/src/boot/mod.rs
index c52b088..122bd53 100644
--- a/src/boot/mod.rs
+++ b/src/boot/mod.rs
@@ -1,14 +1,25 @@
+use crate::{channel::Channel, event::Sequence, message::Message, user::User};
+use serde::Serialize;
+use std::time::Duration;
+
pub mod app;
mod routes;
-use crate::{channel::Channel, event::Sequence, message::Message, user::User};
-
pub use self::routes::router;
#[derive(serde::Serialize)]
pub struct Snapshot {
pub resume_point: Sequence,
+ #[serde(serialize_with = "as_seconds")]
+ pub heartbeat: Duration,
pub users: Vec<User>,
pub channels: Vec<Channel>,
pub messages: Vec<Message>,
}
+
+fn as_seconds<S>(duration: &Duration, serializer: S) -> Result<S::Ok, S::Error>
+where
+ S: serde::Serializer,
+{
+ duration.as_secs().serialize(serializer)
+}
diff --git a/src/event/mod.rs b/src/event/mod.rs
index 3ab88ec..1f2ec42 100644
--- a/src/event/mod.rs
+++ b/src/event/mod.rs
@@ -1,4 +1,7 @@
use crate::{channel, message, user};
+use axum::response::sse;
+use axum::response::sse::KeepAlive;
+use std::time::Duration;
pub mod app;
mod broadcaster;
@@ -21,6 +24,16 @@ pub enum Event {
Message(message::Event),
}
+// Serialized representation is intended to look like the serialized representation of `Event`,
+// above - though heartbeat events contain only a type field and none of the other event gubbins.
+// They don't have to participate in sequence numbering, aren't generated from stored data, and
+// generally Are Weird.
+#[derive(serde::Serialize)]
+#[serde(tag = "type", rename_all = "snake_case")]
+pub enum Heartbeat {
+ Heartbeat,
+}
+
impl Sequenced for Event {
fn instant(&self) -> Instant {
match self {
@@ -48,3 +61,35 @@ impl From<message::Event> for Event {
Self::Message(event)
}
}
+
+impl Heartbeat {
+ // The following values are a first-rough-guess attempt to balance noticing connection problems
+ // quickly with managing the (modest) costs of delivering and processing heartbeats. Feel
+ // encouraged to tune them if you have a better idea on how to set them!
+
+ // Advise clients to expect heartbeats this often
+ pub const TIMEOUT: Duration = Duration::from_secs(20);
+ // Actually send heartbeats this often; this is shorter to allow time for the heartbeat to
+ // arrive before the advised deadline.
+ pub const INTERVAL: Duration = Duration::from_secs(15);
+}
+
+impl TryFrom<Heartbeat> for sse::Event {
+ type Error = serde_json::Error;
+
+ fn try_from(heartbeat: Heartbeat) -> Result<sse::Event, Self::Error> {
+ let heartbeat = serde_json::to_string_pretty(&heartbeat)?;
+ let heartbeat = sse::Event::default().data(heartbeat);
+ Ok(heartbeat)
+ }
+}
+
+impl TryFrom<Heartbeat> for sse::KeepAlive {
+ type Error = <sse::Event as TryFrom<Heartbeat>>::Error;
+
+ fn try_from(heartbeat: Heartbeat) -> Result<sse::KeepAlive, Self::Error> {
+ let event = heartbeat.try_into()?;
+ let keep_alive = KeepAlive::new().interval(Heartbeat::INTERVAL).event(event);
+ Ok(keep_alive)
+ }
+}
diff --git a/src/event/routes/get.rs b/src/event/routes/get.rs
index 2ca8991..f6c91fa 100644
--- a/src/event/routes/get.rs
+++ b/src/event/routes/get.rs
@@ -11,7 +11,7 @@ use futures::stream::{Stream, StreamExt as _};
use crate::{
app::App,
error::{Internal, Unauthorized},
- event::{Event, Sequence, Sequenced as _, app, extract::LastEventId},
+ event::{Event, Heartbeat::Heartbeat, Sequence, Sequenced as _, app, extract::LastEventId},
token::{app::ValidateError, extract::Identity},
};
@@ -44,9 +44,11 @@ where
fn into_response(self) -> response::Response {
let Self(stream) = self;
let stream = stream.map(sse::Event::try_from);
- Sse::new(stream)
- .keep_alive(sse::KeepAlive::default())
- .into_response()
+ let heartbeat = match Heartbeat.try_into().map_err(Internal::from) {
+ Ok(heartbeat) => heartbeat,
+ Err(err) => return err.into_response(),
+ };
+ Sse::new(stream).keep_alive(heartbeat).into_response()
}
}