summaryrefslogtreecommitdiff
path: root/src/channel/app.rs
diff options
context:
space:
mode:
authorOwen Jacobson <owen@grimoire.ca>2024-09-18 02:22:09 -0400
committerOwen Jacobson <owen@grimoire.ca>2024-09-18 12:18:45 -0400
commit2b4cf5c62ff82fa408a4f82bde0b561ff3b15497 (patch)
tree66d1d695a54eef27c4509fb2f5e7e8372a96ba1e /src/channel/app.rs
parentcce6662d635bb2115f9f2a7bab92cc105166e761 (diff)
Make BoxedError an implementation detail of InternalError.
Diffstat (limited to 'src/channel/app.rs')
-rw-r--r--src/channel/app.rs44
1 files changed, 31 insertions, 13 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs
index e72564d..0a28fb6 100644
--- a/src/channel/app.rs
+++ b/src/channel/app.rs
@@ -3,17 +3,16 @@ use std::sync::{Arc, Mutex, MutexGuard};
use futures::{
future,
- stream::{self, StreamExt as _, TryStreamExt as _},
+ stream::{self, StreamExt as _},
Stream,
};
use sqlx::sqlite::SqlitePool;
use tokio::sync::broadcast::{channel, Sender};
-use tokio_stream::wrappers::BroadcastStream;
+use tokio_stream::wrappers::{errors::BroadcastStreamRecvError, BroadcastStream};
use super::repo::broadcast::{self, Provider as _};
use crate::{
clock::DateTime,
- error::BoxedError,
repo::{
channel::{self, Channel, Provider as _},
error::NotFound as _,
@@ -75,17 +74,16 @@ impl<'a> Channels<'a> {
&self,
channel: &channel::Id,
resume_at: Option<&DateTime>,
- ) -> Result<impl Stream<Item = Result<broadcast::Message, BoxedError>> + 'static, EventsError>
- {
- fn skip_stale<E>(
+ ) -> Result<impl Stream<Item = broadcast::Message> + 'static, EventsError> {
+ fn skip_stale(
resume_at: Option<&DateTime>,
- ) -> impl for<'m> FnMut(&'m broadcast::Message) -> future::Ready<Result<bool, E>> {
+ ) -> impl for<'m> FnMut(&'m broadcast::Message) -> future::Ready<bool> {
let resume_at = resume_at.cloned();
move |msg| {
- future::ready(Ok(match resume_at {
+ future::ready(match resume_at {
None => false,
Some(resume_at) => msg.sent_at <= resume_at,
- }))
+ })
}
}
let mut tx = self
@@ -98,13 +96,12 @@ impl<'a> Channels<'a> {
let live_messages = self
.broadcaster
.listen(&channel.id)
- .map_err(BoxedError::from)
- .try_skip_while(skip_stale(resume_at));
+ .skip_while(skip_stale(resume_at));
let stored_messages = tx.broadcast().replay(&channel, resume_at).await?;
tx.commit().await?;
- let stored_messages = stream::iter(stored_messages).map(Ok);
+ let stored_messages = stream::iter(stored_messages);
Ok(stored_messages.chain(live_messages))
}
@@ -176,15 +173,36 @@ impl Broadcaster {
// there are no receivers. In this use case, that's fine; a lack of
// listening consumers (chat clients) when a message is sent isn't an
// error.
+ //
+ // The successful return value, which includes the number of active
+ // receivers, also isn't that interesting to us.
let _ = tx.send(message);
}
// panic: if ``channel`` has not been previously registered, and was not
// part of the initial set of channels.
- pub fn listen(&self, channel: &channel::Id) -> BroadcastStream<broadcast::Message> {
+ pub fn listen(&self, channel: &channel::Id) -> impl Stream<Item = broadcast::Message> {
let rx = self.sender(channel).subscribe();
BroadcastStream::from(rx)
+ .take_while(|r| {
+ future::ready(match r {
+ Ok(_) => true,
+ // Stop the stream here. This will disconnect SSE clients
+ // (see `routes.rs`), who will then resume from
+ // `Last-Event-ID`, allowing them to catch up by reading
+ // the skipped messages from the database.
+ Err(BroadcastStreamRecvError::Lagged(_)) => false,
+ })
+ })
+ .map(|r| {
+ // Since the previous transform stops at the first error, this
+ // should always hold.
+ //
+ // See also <https://users.rust-lang.org/t/taking-from-stream-while-ok/48854>.
+ debug_assert!(r.is_ok());
+ r.unwrap()
+ })
}
// panic: if ``channel`` has not been previously registered, and was not