summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorOwen Jacobson <owen@grimoire.ca>2025-08-26 18:44:34 -0400
committerOwen Jacobson <owen@grimoire.ca>2025-08-26 18:44:34 -0400
commitf839449d5505b5352bd0da931b980a7a0305234f (patch)
tree93c95548126eea048cd8962345b720b883d391c1
parent17c38585fc2623a6b0196146cf6b6df9955ce979 (diff)
Remove entirely-redundant synchronization inside of Broadcaster.
Per <https://docs.rs/tokio/latest/tokio/sync/broadcast/struct.Sender.html>, a `Sender` is safe to share between threads. The clone behaviour we want is also provided by its `Clone` impl directly, and we don't need to wrap the sender in an `Arc` to share it. It's amazing what you can find in the docs.
-rw-r--r--src/broadcast.rs23
1 files changed, 5 insertions, 18 deletions
diff --git a/src/broadcast.rs b/src/broadcast.rs
index ee42c08..dae7641 100644
--- a/src/broadcast.rs
+++ b/src/broadcast.rs
@@ -1,16 +1,11 @@
-use std::sync::{Arc, Mutex};
-
use futures::{Stream, future, stream::StreamExt as _};
use tokio::sync::broadcast::{Sender, channel};
use tokio_stream::wrappers::{BroadcastStream, errors::BroadcastStreamRecvError};
-// Clones will share the same sender.
+// Clones will share the same channel.
#[derive(Clone)]
pub struct Broadcaster<M> {
- // The use of std::sync::Mutex, and not tokio::sync::Mutex, follows Tokio's
- // own advice: <https://tokio.rs/tokio/tutorial/shared-state>. Methods that
- // lock it must be sync.
- senders: Arc<Mutex<Sender<M>>>,
+ sender: Sender<M>,
}
impl<M> Default for Broadcaster<M>
@@ -20,9 +15,7 @@ where
fn default() -> Self {
let sender = Self::make_sender();
- Self {
- senders: Arc::new(Mutex::new(sender)),
- }
+ Self { sender }
}
}
@@ -31,8 +24,6 @@ where
M: Clone + Send + std::fmt::Debug + 'static,
{
pub fn broadcast(&self, message: M) {
- let tx = self.sender();
-
// Per the Tokio docs, the returned error is only used to indicate that
// there are no receivers. In this use case, that's fine; a lack of
// listening consumers (chat clients) when a message is sent isn't an
@@ -40,7 +31,7 @@ where
//
// The successful return value, which includes the number of active
// receivers, also isn't that interesting to us.
- let _ = tx.send(message);
+ let _ = self.sender.send(message);
}
// If `M` is a type that can be obtained from an iterator, such as a `Vec`, and if `I` is an
@@ -62,7 +53,7 @@ where
}
pub fn subscribe(&self) -> impl Stream<Item = M> + std::fmt::Debug + use<M> {
- let rx = self.sender().subscribe();
+ let rx = self.sender.subscribe();
BroadcastStream::from(rx).scan((), |(), r| {
// The following could technically be `r.ok()`, and is exactly
@@ -83,10 +74,6 @@ where
})
}
- fn sender(&self) -> Sender<M> {
- self.senders.lock().unwrap().clone()
- }
-
fn make_sender() -> Sender<M> {
// Queue depth of 16 chosen entirely arbitrarily. Don't read too much
// into it.