From 17c38585fc2623a6b0196146cf6b6df9955ce979 Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Tue, 26 Aug 2025 18:35:54 -0400 Subject: Consolidate `events.map(…).collect()` calls into `Broadcaster`. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This conversion, from an iterator of type-specific events (say, `user::Event` or `message::Event`), into a `Vec`, is prevasive, and it needs to be done each time. Having Broadcaster expose a support method for this cuts down on the repetition, at the cost of a slightly alarming amount of type-system nonsense in `broadcast_from`. Historical footnote: the internal message structure is a Vec and not an individual message so that bulk operations, like expiring channels and messages, won't disconnect everyone if they happen to dispatch more than sixteen messages (current queue depth limit) at once. We trade allocation and memory pressure for keeping the connections alive. _Most_ event publishing is an iterator of one item, so the Vec allocation is redundant. --- src/broadcast.rs | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) (limited to 'src/broadcast.rs') diff --git a/src/broadcast.rs b/src/broadcast.rs index 6e1f04d..ee42c08 100644 --- a/src/broadcast.rs +++ b/src/broadcast.rs @@ -30,7 +30,7 @@ impl Broadcaster where M: Clone + Send + std::fmt::Debug + 'static, { - pub fn broadcast(&self, message: impl Into) { + pub fn broadcast(&self, message: M) { let tx = self.sender(); // Per the Tokio docs, the returned error is only used to indicate that @@ -40,7 +40,25 @@ where // // The successful return value, which includes the number of active // receivers, also isn't that interesting to us. - let _ = tx.send(message.into()); + let _ = tx.send(message); + } + + // If `M` is a type that can be obtained from an iterator, such as a `Vec`, and if `I` is an + // iterable of items that can be collected into `M`, then this will construct an `M` from the + // passed event iterator, converting each element as it goes. This emits one message (as `M`), + // containing whatever we collect out of `messages`. + // + // This is mostly meant for handling synchronized entity events, which tend to be generated as + // iterables of domain-specific event types, like `user::Event`, but broadcast as `Vec` + // for consumption by outside clients. + pub fn broadcast_from(&self, messages: I) + where + I: IntoIterator, + M: FromIterator, + E: From, + { + let message = messages.into_iter().map(Into::into).collect(); + self.broadcast(message); } pub fn subscribe(&self) -> impl Stream + std::fmt::Debug + use { -- cgit v1.2.3 From f839449d5505b5352bd0da931b980a7a0305234f Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Tue, 26 Aug 2025 18:44:34 -0400 Subject: Remove entirely-redundant synchronization inside of Broadcaster. Per , a `Sender` is safe to share between threads. The clone behaviour we want is also provided by its `Clone` impl directly, and we don't need to wrap the sender in an `Arc` to share it. It's amazing what you can find in the docs. --- src/broadcast.rs | 23 +++++------------------ 1 file changed, 5 insertions(+), 18 deletions(-) (limited to 'src/broadcast.rs') diff --git a/src/broadcast.rs b/src/broadcast.rs index ee42c08..dae7641 100644 --- a/src/broadcast.rs +++ b/src/broadcast.rs @@ -1,16 +1,11 @@ -use std::sync::{Arc, Mutex}; - use futures::{Stream, future, stream::StreamExt as _}; use tokio::sync::broadcast::{Sender, channel}; use tokio_stream::wrappers::{BroadcastStream, errors::BroadcastStreamRecvError}; -// Clones will share the same sender. +// Clones will share the same channel. #[derive(Clone)] pub struct Broadcaster { - // The use of std::sync::Mutex, and not tokio::sync::Mutex, follows Tokio's - // own advice: . Methods that - // lock it must be sync. - senders: Arc>>, + sender: Sender, } impl Default for Broadcaster @@ -20,9 +15,7 @@ where fn default() -> Self { let sender = Self::make_sender(); - Self { - senders: Arc::new(Mutex::new(sender)), - } + Self { sender } } } @@ -31,8 +24,6 @@ where M: Clone + Send + std::fmt::Debug + 'static, { pub fn broadcast(&self, message: M) { - let tx = self.sender(); - // Per the Tokio docs, the returned error is only used to indicate that // there are no receivers. In this use case, that's fine; a lack of // listening consumers (chat clients) when a message is sent isn't an @@ -40,7 +31,7 @@ where // // The successful return value, which includes the number of active // receivers, also isn't that interesting to us. - let _ = tx.send(message); + let _ = self.sender.send(message); } // If `M` is a type that can be obtained from an iterator, such as a `Vec`, and if `I` is an @@ -62,7 +53,7 @@ where } pub fn subscribe(&self) -> impl Stream + std::fmt::Debug + use { - let rx = self.sender().subscribe(); + let rx = self.sender.subscribe(); BroadcastStream::from(rx).scan((), |(), r| { // The following could technically be `r.ok()`, and is exactly @@ -83,10 +74,6 @@ where }) } - fn sender(&self) -> Sender { - self.senders.lock().unwrap().clone() - } - fn make_sender() -> Sender { // Queue depth of 16 chosen entirely arbitrarily. Don't read too much // into it. -- cgit v1.2.3