summaryrefslogtreecommitdiff
path: root/src/channel/routes.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/channel/routes.rs')
-rw-r--r--src/channel/routes.rs31
1 files changed, 27 insertions, 4 deletions
diff --git a/src/channel/routes.rs b/src/channel/routes.rs
index 0f95c69..4f83a8b 100644
--- a/src/channel/routes.rs
+++ b/src/channel/routes.rs
@@ -8,9 +8,14 @@ use axum::{
routing::{get, post},
Router,
};
+use axum_extra::TypedHeader;
+use chrono::{format::SecondsFormat, DateTime};
use futures::{future, stream::TryStreamExt as _};
-use super::repo::channels::Id as ChannelId;
+use super::{
+ header::LastEventId,
+ repo::{channels::Id as ChannelId, messages::BroadcastMessage},
+};
use crate::{
app::App, clock::RequestedAt, error::BoxedError, error::InternalError,
login::repo::logins::Login,
@@ -61,13 +66,31 @@ async fn on_events(
Path(channel): Path<ChannelId>,
State(app): State<App>,
_: Login, // requires auth, but doesn't actually care who you are
+ last_event_id: Option<TypedHeader<LastEventId>>,
) -> Result<impl IntoResponse, InternalError> {
+ let resume_at = last_event_id
+ .map(|TypedHeader(header)| header)
+ .map(|LastEventId(header)| header)
+ .map(|header| DateTime::parse_from_rfc3339(&header))
+ .transpose()?
+ .map(|ts| ts.to_utc());
+
let stream = app
.channels()
- .events(&channel)
+ .events(&channel, resume_at.as_ref())
.await?
- .and_then(|msg| future::ready(serde_json::to_string(&msg).map_err(BoxedError::from)))
- .map_ok(|msg| sse::Event::default().data(&msg));
+ .and_then(|msg| future::ready(to_event(msg)));
Ok(Sse::new(stream).keep_alive(sse::KeepAlive::default()))
}
+
+fn to_event(msg: BroadcastMessage) -> Result<sse::Event, BoxedError> {
+ let data = serde_json::to_string(&msg)?;
+ let event = sse::Event::default()
+ .id(msg
+ .sent_at
+ .to_rfc3339_opts(SecondsFormat::AutoSi, /* use_z */ true))
+ .data(&data);
+
+ Ok(event)
+}