From 407ca8df6284ce1a4c649b018c7326fd195bbd26 Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Fri, 13 Sep 2024 22:30:02 -0400 Subject: Support Last-Event-Id as a method of resuming channel events after a disconnect --- src/channel/routes.rs | 31 +++++++++++++++++++++++++++---- 1 file changed, 27 insertions(+), 4 deletions(-) (limited to 'src/channel/routes.rs') diff --git a/src/channel/routes.rs b/src/channel/routes.rs index 0f95c69..4f83a8b 100644 --- a/src/channel/routes.rs +++ b/src/channel/routes.rs @@ -8,9 +8,14 @@ use axum::{ routing::{get, post}, Router, }; +use axum_extra::TypedHeader; +use chrono::{format::SecondsFormat, DateTime}; use futures::{future, stream::TryStreamExt as _}; -use super::repo::channels::Id as ChannelId; +use super::{ + header::LastEventId, + repo::{channels::Id as ChannelId, messages::BroadcastMessage}, +}; use crate::{ app::App, clock::RequestedAt, error::BoxedError, error::InternalError, login::repo::logins::Login, @@ -61,13 +66,31 @@ async fn on_events( Path(channel): Path, State(app): State, _: Login, // requires auth, but doesn't actually care who you are + last_event_id: Option>, ) -> Result { + let resume_at = last_event_id + .map(|TypedHeader(header)| header) + .map(|LastEventId(header)| header) + .map(|header| DateTime::parse_from_rfc3339(&header)) + .transpose()? + .map(|ts| ts.to_utc()); + let stream = app .channels() - .events(&channel) + .events(&channel, resume_at.as_ref()) .await? - .and_then(|msg| future::ready(serde_json::to_string(&msg).map_err(BoxedError::from))) - .map_ok(|msg| sse::Event::default().data(&msg)); + .and_then(|msg| future::ready(to_event(msg))); Ok(Sse::new(stream).keep_alive(sse::KeepAlive::default())) } + +fn to_event(msg: BroadcastMessage) -> Result { + let data = serde_json::to_string(&msg)?; + let event = sse::Event::default() + .id(msg + .sent_at + .to_rfc3339_opts(SecondsFormat::AutoSi, /* use_z */ true)) + .data(&data); + + Ok(event) +} -- cgit v1.2.3