summaryrefslogtreecommitdiff
path: root/src/events/broadcaster.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/events/broadcaster.rs')
-rw-r--r--src/events/broadcaster.rs77
1 files changed, 20 insertions, 57 deletions
diff --git a/src/events/broadcaster.rs b/src/events/broadcaster.rs
index dcaba91..9697c0a 100644
--- a/src/events/broadcaster.rs
+++ b/src/events/broadcaster.rs
@@ -1,63 +1,35 @@
-use std::collections::{hash_map::Entry, HashMap};
-use std::sync::{Arc, Mutex, MutexGuard};
+use std::sync::{Arc, Mutex};
use futures::{future, stream::StreamExt as _, Stream};
-use sqlx::sqlite::SqlitePool;
use tokio::sync::broadcast::{channel, Sender};
use tokio_stream::wrappers::{errors::BroadcastStreamRecvError, BroadcastStream};
-use crate::{
- events::repo::broadcast,
- repo::channel::{self, Provider as _},
-};
+use crate::events::types;
-// Clones will share the same senders collection.
+// Clones will share the same sender.
#[derive(Clone)]
pub struct Broadcaster {
// The use of std::sync::Mutex, and not tokio::sync::Mutex, follows Tokio's
// own advice: <https://tokio.rs/tokio/tutorial/shared-state>. Methods that
// lock it must be sync.
- senders: Arc<Mutex<HashMap<channel::Id, Sender<broadcast::Message>>>>,
+ senders: Arc<Mutex<Sender<types::ChannelEvent>>>,
}
-impl Broadcaster {
- pub async fn from_database(db: &SqlitePool) -> Result<Self, sqlx::Error> {
- let mut tx = db.begin().await?;
- let channels = tx.channels().all().await?;
- tx.commit().await?;
-
- let channels = channels.iter().map(|c| &c.id);
- let broadcaster = Self::new(channels);
- Ok(broadcaster)
- }
-
- fn new<'i>(channels: impl IntoIterator<Item = &'i channel::Id>) -> Self {
- let senders: HashMap<_, _> = channels
- .into_iter()
- .cloned()
- .map(|id| (id, Self::make_sender()))
- .collect();
+impl Default for Broadcaster {
+ fn default() -> Self {
+ let sender = Self::make_sender();
Self {
- senders: Arc::new(Mutex::new(senders)),
- }
- }
-
- // panic: if ``channel`` is already registered.
- pub fn register_channel(&self, channel: &channel::Id) {
- match self.senders().entry(channel.clone()) {
- // This ever happening indicates a serious logic error.
- Entry::Occupied(_) => panic!("duplicate channel registration for channel {channel}"),
- Entry::Vacant(entry) => {
- entry.insert(Self::make_sender());
- }
+ senders: Arc::new(Mutex::new(sender)),
}
}
+}
- // panic: if ``channel`` has not been previously registered, and was not
- // part of the initial set of channels.
- pub fn broadcast(&self, channel: &channel::Id, message: &broadcast::Message) {
- let tx = self.sender(channel);
+impl Broadcaster {
+ // panic: if ``message.channel.id`` has not been previously registered,
+ // and was not part of the initial set of channels.
+ pub fn broadcast(&self, message: &types::ChannelEvent) {
+ let tx = self.sender();
// Per the Tokio docs, the returned error is only used to indicate that
// there are no receivers. In this use case, that's fine; a lack of
@@ -71,15 +43,12 @@ impl Broadcaster {
// panic: if ``channel`` has not been previously registered, and was not
// part of the initial set of channels.
- pub fn subscribe(
- &self,
- channel: &channel::Id,
- ) -> impl Stream<Item = broadcast::Message> + std::fmt::Debug {
- let rx = self.sender(channel).subscribe();
+ pub fn subscribe(&self) -> impl Stream<Item = types::ChannelEvent> + std::fmt::Debug {
+ let rx = self.sender().subscribe();
BroadcastStream::from(rx).scan((), |(), r| {
future::ready(match r {
- Ok(message) => Some(message),
+ Ok(event) => Some(event),
// Stop the stream here. This will disconnect SSE clients
// (see `routes.rs`), who will then resume from
// `Last-Event-ID`, allowing them to catch up by reading
@@ -92,17 +61,11 @@ impl Broadcaster {
})
}
- // panic: if ``channel`` has not been previously registered, and was not
- // part of the initial set of channels.
- fn sender(&self, channel: &channel::Id) -> Sender<broadcast::Message> {
- self.senders()[channel].clone()
- }
-
- fn senders(&self) -> MutexGuard<HashMap<channel::Id, Sender<broadcast::Message>>> {
- self.senders.lock().unwrap() // propagate panics when mutex is poisoned
+ fn sender(&self) -> Sender<types::ChannelEvent> {
+ self.senders.lock().unwrap().clone()
}
- fn make_sender() -> Sender<broadcast::Message> {
+ fn make_sender() -> Sender<types::ChannelEvent> {
// Queue depth of 16 chosen entirely arbitrarily. Don't read too much
// into it.
let (tx, _) = channel(16);