From eff129bc1f29bcb1b2b9d10c6b49ab886edc83d6 Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Fri, 27 Sep 2024 18:17:02 -0400 Subject: Make `/api/events` a firehose endpoint. It now includes events for all channels. Clients are responsible for filtering. The schema for channel events has changed; it now includes a channel name and ID, in the same format as the sender's name and ID. They also now include a `"type"` field, whose only valid value (as of this writing) is `"message"`. This is groundwork for delivering message deletion (expiry) events to clients, and notifying clients of channel lifecycle events. --- src/events/broadcaster.rs | 77 ++++++++++++----------------------------------- 1 file changed, 20 insertions(+), 57 deletions(-) (limited to 'src/events/broadcaster.rs') diff --git a/src/events/broadcaster.rs b/src/events/broadcaster.rs index dcaba91..9697c0a 100644 --- a/src/events/broadcaster.rs +++ b/src/events/broadcaster.rs @@ -1,63 +1,35 @@ -use std::collections::{hash_map::Entry, HashMap}; -use std::sync::{Arc, Mutex, MutexGuard}; +use std::sync::{Arc, Mutex}; use futures::{future, stream::StreamExt as _, Stream}; -use sqlx::sqlite::SqlitePool; use tokio::sync::broadcast::{channel, Sender}; use tokio_stream::wrappers::{errors::BroadcastStreamRecvError, BroadcastStream}; -use crate::{ - events::repo::broadcast, - repo::channel::{self, Provider as _}, -}; +use crate::events::types; -// Clones will share the same senders collection. +// Clones will share the same sender. #[derive(Clone)] pub struct Broadcaster { // The use of std::sync::Mutex, and not tokio::sync::Mutex, follows Tokio's // own advice: . Methods that // lock it must be sync. - senders: Arc>>>, + senders: Arc>>, } -impl Broadcaster { - pub async fn from_database(db: &SqlitePool) -> Result { - let mut tx = db.begin().await?; - let channels = tx.channels().all().await?; - tx.commit().await?; - - let channels = channels.iter().map(|c| &c.id); - let broadcaster = Self::new(channels); - Ok(broadcaster) - } - - fn new<'i>(channels: impl IntoIterator) -> Self { - let senders: HashMap<_, _> = channels - .into_iter() - .cloned() - .map(|id| (id, Self::make_sender())) - .collect(); +impl Default for Broadcaster { + fn default() -> Self { + let sender = Self::make_sender(); Self { - senders: Arc::new(Mutex::new(senders)), - } - } - - // panic: if ``channel`` is already registered. - pub fn register_channel(&self, channel: &channel::Id) { - match self.senders().entry(channel.clone()) { - // This ever happening indicates a serious logic error. - Entry::Occupied(_) => panic!("duplicate channel registration for channel {channel}"), - Entry::Vacant(entry) => { - entry.insert(Self::make_sender()); - } + senders: Arc::new(Mutex::new(sender)), } } +} - // panic: if ``channel`` has not been previously registered, and was not - // part of the initial set of channels. - pub fn broadcast(&self, channel: &channel::Id, message: &broadcast::Message) { - let tx = self.sender(channel); +impl Broadcaster { + // panic: if ``message.channel.id`` has not been previously registered, + // and was not part of the initial set of channels. + pub fn broadcast(&self, message: &types::ChannelEvent) { + let tx = self.sender(); // Per the Tokio docs, the returned error is only used to indicate that // there are no receivers. In this use case, that's fine; a lack of @@ -71,15 +43,12 @@ impl Broadcaster { // panic: if ``channel`` has not been previously registered, and was not // part of the initial set of channels. - pub fn subscribe( - &self, - channel: &channel::Id, - ) -> impl Stream + std::fmt::Debug { - let rx = self.sender(channel).subscribe(); + pub fn subscribe(&self) -> impl Stream + std::fmt::Debug { + let rx = self.sender().subscribe(); BroadcastStream::from(rx).scan((), |(), r| { future::ready(match r { - Ok(message) => Some(message), + Ok(event) => Some(event), // Stop the stream here. This will disconnect SSE clients // (see `routes.rs`), who will then resume from // `Last-Event-ID`, allowing them to catch up by reading @@ -92,17 +61,11 @@ impl Broadcaster { }) } - // panic: if ``channel`` has not been previously registered, and was not - // part of the initial set of channels. - fn sender(&self, channel: &channel::Id) -> Sender { - self.senders()[channel].clone() - } - - fn senders(&self) -> MutexGuard>> { - self.senders.lock().unwrap() // propagate panics when mutex is poisoned + fn sender(&self) -> Sender { + self.senders.lock().unwrap().clone() } - fn make_sender() -> Sender { + fn make_sender() -> Sender { // Queue depth of 16 chosen entirely arbitrarily. Don't read too much // into it. let (tx, _) = channel(16); -- cgit v1.2.3