diff options
Diffstat (limited to 'src/channel/routes.rs')
| -rw-r--r-- | src/channel/routes.rs | 31 |
1 files changed, 27 insertions, 4 deletions
diff --git a/src/channel/routes.rs b/src/channel/routes.rs index 0f95c69..4f83a8b 100644 --- a/src/channel/routes.rs +++ b/src/channel/routes.rs @@ -8,9 +8,14 @@ use axum::{ routing::{get, post}, Router, }; +use axum_extra::TypedHeader; +use chrono::{format::SecondsFormat, DateTime}; use futures::{future, stream::TryStreamExt as _}; -use super::repo::channels::Id as ChannelId; +use super::{ + header::LastEventId, + repo::{channels::Id as ChannelId, messages::BroadcastMessage}, +}; use crate::{ app::App, clock::RequestedAt, error::BoxedError, error::InternalError, login::repo::logins::Login, @@ -61,13 +66,31 @@ async fn on_events( Path(channel): Path<ChannelId>, State(app): State<App>, _: Login, // requires auth, but doesn't actually care who you are + last_event_id: Option<TypedHeader<LastEventId>>, ) -> Result<impl IntoResponse, InternalError> { + let resume_at = last_event_id + .map(|TypedHeader(header)| header) + .map(|LastEventId(header)| header) + .map(|header| DateTime::parse_from_rfc3339(&header)) + .transpose()? + .map(|ts| ts.to_utc()); + let stream = app .channels() - .events(&channel) + .events(&channel, resume_at.as_ref()) .await? - .and_then(|msg| future::ready(serde_json::to_string(&msg).map_err(BoxedError::from))) - .map_ok(|msg| sse::Event::default().data(&msg)); + .and_then(|msg| future::ready(to_event(msg))); Ok(Sse::new(stream).keep_alive(sse::KeepAlive::default())) } + +fn to_event(msg: BroadcastMessage) -> Result<sse::Event, BoxedError> { + let data = serde_json::to_string(&msg)?; + let event = sse::Event::default() + .id(msg + .sent_at + .to_rfc3339_opts(SecondsFormat::AutoSi, /* use_z */ true)) + .data(&data); + + Ok(event) +} |
