summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorOwen Jacobson <owen@grimoire.ca>2024-09-25 23:39:24 -0400
committerOwen Jacobson <owen@grimoire.ca>2024-09-25 23:57:25 -0400
commit45b5ff6a9aea2709e0e5568636105ccefbb7e66f (patch)
tree72e4977c12f4bb79049763fbe79086d343596962
parentcce1ab45db0de5e912fa7eec8d8a2cfe9a314078 (diff)
Stream over results while OK, using less code.
This also has the happy effect of removing an unwrap. This feels like a more coherent way of achieving the same result.
-rw-r--r--src/events/broadcaster.rs27
1 files changed, 11 insertions, 16 deletions
diff --git a/src/events/broadcaster.rs b/src/events/broadcaster.rs
index 6a1219a..dcaba91 100644
--- a/src/events/broadcaster.rs
+++ b/src/events/broadcaster.rs
@@ -77,24 +77,19 @@ impl Broadcaster {
) -> impl Stream<Item = broadcast::Message> + std::fmt::Debug {
let rx = self.sender(channel).subscribe();
- BroadcastStream::from(rx)
- .take_while(|r| {
- future::ready(match r {
- Ok(_) => true,
- // Stop the stream here. This will disconnect SSE clients
- // (see `routes.rs`), who will then resume from
- // `Last-Event-ID`, allowing them to catch up by reading
- // the skipped messages from the database.
- Err(BroadcastStreamRecvError::Lagged(_)) => false,
- })
- })
- .map(|r| {
- // Since the previous transform stops at the first error, this
- // should always hold.
+ BroadcastStream::from(rx).scan((), |(), r| {
+ future::ready(match r {
+ Ok(message) => Some(message),
+ // Stop the stream here. This will disconnect SSE clients
+ // (see `routes.rs`), who will then resume from
+ // `Last-Event-ID`, allowing them to catch up by reading
+ // the skipped messages from the database.
//
- // See also <https://users.rust-lang.org/t/taking-from-stream-while-ok/48854>.
- r.expect("after filtering, only `Ok` messages should remain")
+ // See also:
+ // <https://users.rust-lang.org/t/taking-from-stream-while-ok/48854>
+ Err(BroadcastStreamRecvError::Lagged(_)) => None,
})
+ })
}
// panic: if ``channel`` has not been previously registered, and was not