From f839449d5505b5352bd0da931b980a7a0305234f Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Tue, 26 Aug 2025 18:44:34 -0400 Subject: Remove entirely-redundant synchronization inside of Broadcaster. Per , a `Sender` is safe to share between threads. The clone behaviour we want is also provided by its `Clone` impl directly, and we don't need to wrap the sender in an `Arc` to share it. It's amazing what you can find in the docs. --- src/broadcast.rs | 23 +++++------------------ 1 file changed, 5 insertions(+), 18 deletions(-) (limited to 'src/broadcast.rs') diff --git a/src/broadcast.rs b/src/broadcast.rs index ee42c08..dae7641 100644 --- a/src/broadcast.rs +++ b/src/broadcast.rs @@ -1,16 +1,11 @@ -use std::sync::{Arc, Mutex}; - use futures::{Stream, future, stream::StreamExt as _}; use tokio::sync::broadcast::{Sender, channel}; use tokio_stream::wrappers::{BroadcastStream, errors::BroadcastStreamRecvError}; -// Clones will share the same sender. +// Clones will share the same channel. #[derive(Clone)] pub struct Broadcaster { - // The use of std::sync::Mutex, and not tokio::sync::Mutex, follows Tokio's - // own advice: . Methods that - // lock it must be sync. - senders: Arc>>, + sender: Sender, } impl Default for Broadcaster @@ -20,9 +15,7 @@ where fn default() -> Self { let sender = Self::make_sender(); - Self { - senders: Arc::new(Mutex::new(sender)), - } + Self { sender } } } @@ -31,8 +24,6 @@ where M: Clone + Send + std::fmt::Debug + 'static, { pub fn broadcast(&self, message: M) { - let tx = self.sender(); - // Per the Tokio docs, the returned error is only used to indicate that // there are no receivers. In this use case, that's fine; a lack of // listening consumers (chat clients) when a message is sent isn't an @@ -40,7 +31,7 @@ where // // The successful return value, which includes the number of active // receivers, also isn't that interesting to us. - let _ = tx.send(message); + let _ = self.sender.send(message); } // If `M` is a type that can be obtained from an iterator, such as a `Vec`, and if `I` is an @@ -62,7 +53,7 @@ where } pub fn subscribe(&self) -> impl Stream + std::fmt::Debug + use { - let rx = self.sender().subscribe(); + let rx = self.sender.subscribe(); BroadcastStream::from(rx).scan((), |(), r| { // The following could technically be `r.ok()`, and is exactly @@ -83,10 +74,6 @@ where }) } - fn sender(&self) -> Sender { - self.senders.lock().unwrap().clone() - } - fn make_sender() -> Sender { // Queue depth of 16 chosen entirely arbitrarily. Don't read too much // into it. -- cgit v1.2.3