summaryrefslogtreecommitdiff
path: root/src/events.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/events.rs')
-rw-r--r--src/events.rs29
1 files changed, 24 insertions, 5 deletions
diff --git a/src/events.rs b/src/events.rs
index 2d1e1f8..51f06b4 100644
--- a/src/events.rs
+++ b/src/events.rs
@@ -49,21 +49,33 @@ async fn on_events(
let streams = stream::iter(query.channels)
.then(|channel| {
let app = app.clone();
- async move { app.channels().events(&channel, resume_at.as_ref()).await }
+ async move {
+ let events = app
+ .channels()
+ .events(&channel, resume_at.as_ref())
+ .await?
+ .map_ok(move |message| ChannelEvent {
+ channel: channel.clone(),
+ message,
+ });
+
+ Ok::<_, BoxedError>(events)
+ }
})
.try_collect::<Vec<_>>()
.await?;
- let stream = stream::select_all(streams).and_then(|msg| future::ready(to_event(msg)));
+ let stream = stream::select_all(streams).and_then(|msg| future::ready(to_sse_event(msg)));
let sse = Sse::new(stream).keep_alive(sse::KeepAlive::default());
Ok(sse)
}
-fn to_event(msg: BroadcastMessage) -> Result<sse::Event, BoxedError> {
- let data = serde_json::to_string(&msg)?;
+fn to_sse_event(event: ChannelEvent<BroadcastMessage>) -> Result<sse::Event, BoxedError> {
+ let data = serde_json::to_string(&event)?;
let event = sse::Event::default()
- .id(msg
+ .id(event
+ .message
.sent_at
.to_rfc3339_opts(SecondsFormat::AutoSi, /* use_z */ true))
.data(&data);
@@ -71,6 +83,13 @@ fn to_event(msg: BroadcastMessage) -> Result<sse::Event, BoxedError> {
Ok(event)
}
+#[derive(serde::Serialize)]
+struct ChannelEvent<M> {
+ channel: ChannelId,
+ #[serde(flatten)]
+ message: M,
+}
+
pub struct LastEventId(pub String);
static LAST_EVENT_ID: HeaderName = HeaderName::from_static("last-event-id");