From 2b4cf5c62ff82fa408a4f82bde0b561ff3b15497 Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Wed, 18 Sep 2024 02:22:09 -0400 Subject: Make BoxedError an implementation detail of InternalError. --- src/events.rs | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) (limited to 'src/events.rs') diff --git a/src/events.rs b/src/events.rs index 9b5901e..5d2dcf0 100644 --- a/src/events.rs +++ b/src/events.rs @@ -9,15 +9,12 @@ use axum::{ }; use axum_extra::extract::Query; use chrono::{format::SecondsFormat, DateTime}; -use futures::{ - future, - stream::{self, StreamExt as _, TryStreamExt as _}, -}; +use futures::stream::{self, StreamExt as _, TryStreamExt as _}; use crate::{ app::App, - channel::repo::broadcast, - error::{BoxedError, InternalError}, + channel::{app::EventsError, repo::broadcast}, + error::InternalError, header::LastEventId, repo::{channel, login::Login}, }; @@ -52,21 +49,21 @@ async fn on_events( .channels() .events(&channel, resume_at.as_ref()) .await? - .map_ok(ChannelEvent::wrap(channel)); + .map(ChannelEvent::wrap(channel)); - Ok::<_, BoxedError>(events) + Ok::<_, EventsError>(events) } }) .try_collect::>() .await?; - let stream = stream::select_all(streams).and_then(|msg| future::ready(to_sse_event(msg))); + let stream = stream::select_all(streams).map(to_sse_event); let sse = Sse::new(stream).keep_alive(sse::KeepAlive::default()); Ok(sse) } -fn to_sse_event(event: ChannelEvent) -> Result { +fn to_sse_event(event: ChannelEvent) -> Result { let data = serde_json::to_string(&event)?; let event = sse::Event::default() .id(event -- cgit v1.2.3