diff options
Diffstat (limited to 'src/events.rs')
| -rw-r--r-- | src/events.rs | 29 |
1 files changed, 24 insertions, 5 deletions
diff --git a/src/events.rs b/src/events.rs index 2d1e1f8..51f06b4 100644 --- a/src/events.rs +++ b/src/events.rs @@ -49,21 +49,33 @@ async fn on_events( let streams = stream::iter(query.channels) .then(|channel| { let app = app.clone(); - async move { app.channels().events(&channel, resume_at.as_ref()).await } + async move { + let events = app + .channels() + .events(&channel, resume_at.as_ref()) + .await? + .map_ok(move |message| ChannelEvent { + channel: channel.clone(), + message, + }); + + Ok::<_, BoxedError>(events) + } }) .try_collect::<Vec<_>>() .await?; - let stream = stream::select_all(streams).and_then(|msg| future::ready(to_event(msg))); + let stream = stream::select_all(streams).and_then(|msg| future::ready(to_sse_event(msg))); let sse = Sse::new(stream).keep_alive(sse::KeepAlive::default()); Ok(sse) } -fn to_event(msg: BroadcastMessage) -> Result<sse::Event, BoxedError> { - let data = serde_json::to_string(&msg)?; +fn to_sse_event(event: ChannelEvent<BroadcastMessage>) -> Result<sse::Event, BoxedError> { + let data = serde_json::to_string(&event)?; let event = sse::Event::default() - .id(msg + .id(event + .message .sent_at .to_rfc3339_opts(SecondsFormat::AutoSi, /* use_z */ true)) .data(&data); @@ -71,6 +83,13 @@ fn to_event(msg: BroadcastMessage) -> Result<sse::Event, BoxedError> { Ok(event) } +#[derive(serde::Serialize)] +struct ChannelEvent<M> { + channel: ChannelId, + #[serde(flatten)] + message: M, +} + pub struct LastEventId(pub String); static LAST_EVENT_ID: HeaderName = HeaderName::from_static("last-event-id"); |
