diff options
Diffstat (limited to 'src/event')
| -rw-r--r-- | src/event/mod.rs | 45 | ||||
| -rw-r--r-- | src/event/routes/get.rs | 10 |
2 files changed, 51 insertions, 4 deletions
diff --git a/src/event/mod.rs b/src/event/mod.rs index 3ab88ec..1f2ec42 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -1,4 +1,7 @@ use crate::{channel, message, user}; +use axum::response::sse; +use axum::response::sse::KeepAlive; +use std::time::Duration; pub mod app; mod broadcaster; @@ -21,6 +24,16 @@ pub enum Event { Message(message::Event), } +// Serialized representation is intended to look like the serialized representation of `Event`, +// above - though heartbeat events contain only a type field and none of the other event gubbins. +// They don't have to participate in sequence numbering, aren't generated from stored data, and +// generally Are Weird. +#[derive(serde::Serialize)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum Heartbeat { + Heartbeat, +} + impl Sequenced for Event { fn instant(&self) -> Instant { match self { @@ -48,3 +61,35 @@ impl From<message::Event> for Event { Self::Message(event) } } + +impl Heartbeat { + // The following values are a first-rough-guess attempt to balance noticing connection problems + // quickly with managing the (modest) costs of delivering and processing heartbeats. Feel + // encouraged to tune them if you have a better idea on how to set them! + + // Advise clients to expect heartbeats this often + pub const TIMEOUT: Duration = Duration::from_secs(20); + // Actually send heartbeats this often; this is shorter to allow time for the heartbeat to + // arrive before the advised deadline. + pub const INTERVAL: Duration = Duration::from_secs(15); +} + +impl TryFrom<Heartbeat> for sse::Event { + type Error = serde_json::Error; + + fn try_from(heartbeat: Heartbeat) -> Result<sse::Event, Self::Error> { + let heartbeat = serde_json::to_string_pretty(&heartbeat)?; + let heartbeat = sse::Event::default().data(heartbeat); + Ok(heartbeat) + } +} + +impl TryFrom<Heartbeat> for sse::KeepAlive { + type Error = <sse::Event as TryFrom<Heartbeat>>::Error; + + fn try_from(heartbeat: Heartbeat) -> Result<sse::KeepAlive, Self::Error> { + let event = heartbeat.try_into()?; + let keep_alive = KeepAlive::new().interval(Heartbeat::INTERVAL).event(event); + Ok(keep_alive) + } +} diff --git a/src/event/routes/get.rs b/src/event/routes/get.rs index 2ca8991..f6c91fa 100644 --- a/src/event/routes/get.rs +++ b/src/event/routes/get.rs @@ -11,7 +11,7 @@ use futures::stream::{Stream, StreamExt as _}; use crate::{ app::App, error::{Internal, Unauthorized}, - event::{Event, Sequence, Sequenced as _, app, extract::LastEventId}, + event::{Event, Heartbeat::Heartbeat, Sequence, Sequenced as _, app, extract::LastEventId}, token::{app::ValidateError, extract::Identity}, }; @@ -44,9 +44,11 @@ where fn into_response(self) -> response::Response { let Self(stream) = self; let stream = stream.map(sse::Event::try_from); - Sse::new(stream) - .keep_alive(sse::KeepAlive::default()) - .into_response() + let heartbeat = match Heartbeat.try_into().map_err(Internal::from) { + Ok(heartbeat) => heartbeat, + Err(err) => return err.into_response(), + }; + Sse::new(stream).keep_alive(heartbeat).into_response() } } |
