diff options
| author | Owen Jacobson <owen@grimoire.ca> | 2024-09-25 23:39:24 -0400 |
|---|---|---|
| committer | Owen Jacobson <owen@grimoire.ca> | 2024-09-25 23:57:25 -0400 |
| commit | 45b5ff6a9aea2709e0e5568636105ccefbb7e66f (patch) | |
| tree | 72e4977c12f4bb79049763fbe79086d343596962 | |
| parent | cce1ab45db0de5e912fa7eec8d8a2cfe9a314078 (diff) | |
Stream over results while OK, using less code.
This also has the happy effect of removing an unwrap. This feels like a more coherent way of achieving the same result.
| -rw-r--r-- | src/events/broadcaster.rs | 27 |
1 files changed, 11 insertions, 16 deletions
diff --git a/src/events/broadcaster.rs b/src/events/broadcaster.rs index 6a1219a..dcaba91 100644 --- a/src/events/broadcaster.rs +++ b/src/events/broadcaster.rs @@ -77,24 +77,19 @@ impl Broadcaster { ) -> impl Stream<Item = broadcast::Message> + std::fmt::Debug { let rx = self.sender(channel).subscribe(); - BroadcastStream::from(rx) - .take_while(|r| { - future::ready(match r { - Ok(_) => true, - // Stop the stream here. This will disconnect SSE clients - // (see `routes.rs`), who will then resume from - // `Last-Event-ID`, allowing them to catch up by reading - // the skipped messages from the database. - Err(BroadcastStreamRecvError::Lagged(_)) => false, - }) - }) - .map(|r| { - // Since the previous transform stops at the first error, this - // should always hold. + BroadcastStream::from(rx).scan((), |(), r| { + future::ready(match r { + Ok(message) => Some(message), + // Stop the stream here. This will disconnect SSE clients + // (see `routes.rs`), who will then resume from + // `Last-Event-ID`, allowing them to catch up by reading + // the skipped messages from the database. // - // See also <https://users.rust-lang.org/t/taking-from-stream-while-ok/48854>. - r.expect("after filtering, only `Ok` messages should remain") + // See also: + // <https://users.rust-lang.org/t/taking-from-stream-while-ok/48854> + Err(BroadcastStreamRecvError::Lagged(_)) => None, }) + }) } // panic: if ``channel`` has not been previously registered, and was not |
