use std::sync::{Arc, Mutex}; use futures::{future, stream::StreamExt as _, Stream}; use tokio::sync::broadcast::{channel, Sender}; use tokio_stream::wrappers::{errors::BroadcastStreamRecvError, BroadcastStream}; // Clones will share the same sender. #[derive(Clone)] pub struct Broadcaster { // The use of std::sync::Mutex, and not tokio::sync::Mutex, follows Tokio's // own advice: . Methods that // lock it must be sync. senders: Arc>>, } impl Default for Broadcaster where M: Clone + Send + std::fmt::Debug + 'static, { fn default() -> Self { let sender = Self::make_sender(); Self { senders: Arc::new(Mutex::new(sender)), } } } impl Broadcaster where M: Clone + Send + std::fmt::Debug + 'static, { // panic: if ``message.channel.id`` has not been previously registered, // and was not part of the initial set of channels. pub fn broadcast(&self, message: &M) { let tx = self.sender(); // Per the Tokio docs, the returned error is only used to indicate that // there are no receivers. In this use case, that's fine; a lack of // listening consumers (chat clients) when a message is sent isn't an // error. // // The successful return value, which includes the number of active // receivers, also isn't that interesting to us. let _ = tx.send(message.clone()); } // panic: if ``channel`` has not been previously registered, and was not // part of the initial set of channels. pub fn subscribe(&self) -> impl Stream + std::fmt::Debug { let rx = self.sender().subscribe(); BroadcastStream::from(rx).scan((), |(), r| { future::ready(match r { Ok(event) => Some(event), // Stop the stream here. This will disconnect SSE clients // (see `routes.rs`), who will then resume from // `Last-Event-ID`, allowing them to catch up by reading // the skipped messages from the database. // // See also: // Err(BroadcastStreamRecvError::Lagged(_)) => None, }) }) } fn sender(&self) -> Sender { self.senders.lock().unwrap().clone() } fn make_sender() -> Sender { // Queue depth of 16 chosen entirely arbitrarily. Don't read too much // into it. let (tx, _) = channel(16); tx } }