diff options
Diffstat (limited to 'src/events/broadcaster.rs')
| -rw-r--r-- | src/events/broadcaster.rs | 75 |
1 files changed, 2 insertions, 73 deletions
diff --git a/src/events/broadcaster.rs b/src/events/broadcaster.rs index 9697c0a..6b664cb 100644 --- a/src/events/broadcaster.rs +++ b/src/events/broadcaster.rs @@ -1,74 +1,3 @@ -use std::sync::{Arc, Mutex}; +use crate::{broadcast, events::types}; -use futures::{future, stream::StreamExt as _, Stream}; -use tokio::sync::broadcast::{channel, Sender}; -use tokio_stream::wrappers::{errors::BroadcastStreamRecvError, BroadcastStream}; - -use crate::events::types; - -// Clones will share the same sender. -#[derive(Clone)] -pub struct Broadcaster { - // The use of std::sync::Mutex, and not tokio::sync::Mutex, follows Tokio's - // own advice: <https://tokio.rs/tokio/tutorial/shared-state>. Methods that - // lock it must be sync. - senders: Arc<Mutex<Sender<types::ChannelEvent>>>, -} - -impl Default for Broadcaster { - fn default() -> Self { - let sender = Self::make_sender(); - - Self { - senders: Arc::new(Mutex::new(sender)), - } - } -} - -impl Broadcaster { - // panic: if ``message.channel.id`` has not been previously registered, - // and was not part of the initial set of channels. - pub fn broadcast(&self, message: &types::ChannelEvent) { - let tx = self.sender(); - - // Per the Tokio docs, the returned error is only used to indicate that - // there are no receivers. In this use case, that's fine; a lack of - // listening consumers (chat clients) when a message is sent isn't an - // error. - // - // The successful return value, which includes the number of active - // receivers, also isn't that interesting to us. - let _ = tx.send(message.clone()); - } - - // panic: if ``channel`` has not been previously registered, and was not - // part of the initial set of channels. - pub fn subscribe(&self) -> impl Stream<Item = types::ChannelEvent> + std::fmt::Debug { - let rx = self.sender().subscribe(); - - BroadcastStream::from(rx).scan((), |(), r| { - future::ready(match r { - Ok(event) => Some(event), - // Stop the stream here. This will disconnect SSE clients - // (see `routes.rs`), who will then resume from - // `Last-Event-ID`, allowing them to catch up by reading - // the skipped messages from the database. - // - // See also: - // <https://users.rust-lang.org/t/taking-from-stream-while-ok/48854> - Err(BroadcastStreamRecvError::Lagged(_)) => None, - }) - }) - } - - fn sender(&self) -> Sender<types::ChannelEvent> { - self.senders.lock().unwrap().clone() - } - - fn make_sender() -> Sender<types::ChannelEvent> { - // Queue depth of 16 chosen entirely arbitrarily. Don't read too much - // into it. - let (tx, _) = channel(16); - tx - } -} +pub type Broadcaster = broadcast::Broadcaster<types::ChannelEvent>; |
