diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/events/broadcaster.rs | 27 |
1 files changed, 11 insertions, 16 deletions
diff --git a/src/events/broadcaster.rs b/src/events/broadcaster.rs index 6a1219a..dcaba91 100644 --- a/src/events/broadcaster.rs +++ b/src/events/broadcaster.rs @@ -77,24 +77,19 @@ impl Broadcaster { ) -> impl Stream<Item = broadcast::Message> + std::fmt::Debug { let rx = self.sender(channel).subscribe(); - BroadcastStream::from(rx) - .take_while(|r| { - future::ready(match r { - Ok(_) => true, - // Stop the stream here. This will disconnect SSE clients - // (see `routes.rs`), who will then resume from - // `Last-Event-ID`, allowing them to catch up by reading - // the skipped messages from the database. - Err(BroadcastStreamRecvError::Lagged(_)) => false, - }) - }) - .map(|r| { - // Since the previous transform stops at the first error, this - // should always hold. + BroadcastStream::from(rx).scan((), |(), r| { + future::ready(match r { + Ok(message) => Some(message), + // Stop the stream here. This will disconnect SSE clients + // (see `routes.rs`), who will then resume from + // `Last-Event-ID`, allowing them to catch up by reading + // the skipped messages from the database. // - // See also <https://users.rust-lang.org/t/taking-from-stream-while-ok/48854>. - r.expect("after filtering, only `Ok` messages should remain") + // See also: + // <https://users.rust-lang.org/t/taking-from-stream-while-ok/48854> + Err(BroadcastStreamRecvError::Lagged(_)) => None, }) + }) } // panic: if ``channel`` has not been previously registered, and was not |
