diff options
Diffstat (limited to 'src/events.rs')
| -rw-r--r-- | src/events.rs | 17 |
1 files changed, 7 insertions, 10 deletions
diff --git a/src/events.rs b/src/events.rs index 9b5901e..5d2dcf0 100644 --- a/src/events.rs +++ b/src/events.rs @@ -9,15 +9,12 @@ use axum::{ }; use axum_extra::extract::Query; use chrono::{format::SecondsFormat, DateTime}; -use futures::{ - future, - stream::{self, StreamExt as _, TryStreamExt as _}, -}; +use futures::stream::{self, StreamExt as _, TryStreamExt as _}; use crate::{ app::App, - channel::repo::broadcast, - error::{BoxedError, InternalError}, + channel::{app::EventsError, repo::broadcast}, + error::InternalError, header::LastEventId, repo::{channel, login::Login}, }; @@ -52,21 +49,21 @@ async fn on_events( .channels() .events(&channel, resume_at.as_ref()) .await? - .map_ok(ChannelEvent::wrap(channel)); + .map(ChannelEvent::wrap(channel)); - Ok::<_, BoxedError>(events) + Ok::<_, EventsError>(events) } }) .try_collect::<Vec<_>>() .await?; - let stream = stream::select_all(streams).and_then(|msg| future::ready(to_sse_event(msg))); + let stream = stream::select_all(streams).map(to_sse_event); let sse = Sse::new(stream).keep_alive(sse::KeepAlive::default()); Ok(sse) } -fn to_sse_event(event: ChannelEvent<broadcast::Message>) -> Result<sse::Event, BoxedError> { +fn to_sse_event(event: ChannelEvent<broadcast::Message>) -> Result<sse::Event, serde_json::Error> { let data = serde_json::to_string(&event)?; let event = sse::Event::default() .id(event |
