use std::sync::{Arc, Mutex}; use futures::{Stream, future, stream::StreamExt as _}; use tokio::sync::broadcast::{Sender, channel}; use tokio_stream::wrappers::{BroadcastStream, errors::BroadcastStreamRecvError}; // Clones will share the same sender. #[derive(Clone)] pub struct Broadcaster { // The use of std::sync::Mutex, and not tokio::sync::Mutex, follows Tokio's // own advice: . Methods that // lock it must be sync. senders: Arc>>, } impl Default for Broadcaster where M: Clone + Send + std::fmt::Debug + 'static, { fn default() -> Self { let sender = Self::make_sender(); Self { senders: Arc::new(Mutex::new(sender)), } } } impl Broadcaster where M: Clone + Send + std::fmt::Debug + 'static, { // panic: if ``message.channel.id`` has not been previously registered, // and was not part of the initial set of channels. pub fn broadcast(&self, message: impl Into) { let tx = self.sender(); // Per the Tokio docs, the returned error is only used to indicate that // there are no receivers. In this use case, that's fine; a lack of // listening consumers (chat clients) when a message is sent isn't an // error. // // The successful return value, which includes the number of active // receivers, also isn't that interesting to us. let _ = tx.send(message.into()); } // panic: if ``channel`` has not been previously registered, and was not // part of the initial set of channels. pub fn subscribe(&self) -> impl Stream + std::fmt::Debug + use { let rx = self.sender().subscribe(); BroadcastStream::from(rx).scan((), |(), r| { // The following could technically be `r.ok()`, and is exactly // equivalent to it, but spelling out the match arms means we'll // find out at compile time if new errors get added to // `BroadcastStreamRecvError`. #[allow(clippy::manual_ok_err)] future::ready(match r { Ok(event) => Some(event), // Stop the stream here. This will disconnect SSE clients (see the `/api/events` // endpoint), who will then resume from `Last-Event-ID`, allowing them to catch up // by reading the skipped messages from the database. // // See also: // Err(BroadcastStreamRecvError::Lagged(_)) => None, }) }) } fn sender(&self) -> Sender { self.senders.lock().unwrap().clone() } fn make_sender() -> Sender { // Queue depth of 16 chosen entirely arbitrarily. Don't read too much // into it. let (tx, _) = channel(16); tx } }