summaryrefslogtreecommitdiff
path: root/src/events.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/events.rs')
-rw-r--r--src/events.rs17
1 files changed, 7 insertions, 10 deletions
diff --git a/src/events.rs b/src/events.rs
index 9b5901e..5d2dcf0 100644
--- a/src/events.rs
+++ b/src/events.rs
@@ -9,15 +9,12 @@ use axum::{
};
use axum_extra::extract::Query;
use chrono::{format::SecondsFormat, DateTime};
-use futures::{
- future,
- stream::{self, StreamExt as _, TryStreamExt as _},
-};
+use futures::stream::{self, StreamExt as _, TryStreamExt as _};
use crate::{
app::App,
- channel::repo::broadcast,
- error::{BoxedError, InternalError},
+ channel::{app::EventsError, repo::broadcast},
+ error::InternalError,
header::LastEventId,
repo::{channel, login::Login},
};
@@ -52,21 +49,21 @@ async fn on_events(
.channels()
.events(&channel, resume_at.as_ref())
.await?
- .map_ok(ChannelEvent::wrap(channel));
+ .map(ChannelEvent::wrap(channel));
- Ok::<_, BoxedError>(events)
+ Ok::<_, EventsError>(events)
}
})
.try_collect::<Vec<_>>()
.await?;
- let stream = stream::select_all(streams).and_then(|msg| future::ready(to_sse_event(msg)));
+ let stream = stream::select_all(streams).map(to_sse_event);
let sse = Sse::new(stream).keep_alive(sse::KeepAlive::default());
Ok(sse)
}
-fn to_sse_event(event: ChannelEvent<broadcast::Message>) -> Result<sse::Event, BoxedError> {
+fn to_sse_event(event: ChannelEvent<broadcast::Message>) -> Result<sse::Event, serde_json::Error> {
let data = serde_json::to_string(&event)?;
let event = sse::Event::default()
.id(event