summaryrefslogtreecommitdiff
path: root/src/channel/app.rs
diff options
context:
space:
mode:
authorOwen Jacobson <owen@grimoire.ca>2024-09-13 01:46:53 -0400
committerOwen Jacobson <owen@grimoire.ca>2024-09-13 02:42:27 -0400
commit43a20d43b09876082e54b550087f166aabdab82d (patch)
tree1c9518d45eed50b3db93a9f0df71c23f5b06f039 /src/channel/app.rs
parent3193a30ebcf6bafdeaf463eda0e7e82082dfe4b5 (diff)
Suggested fixes from Clippy, via nursery and pedantic sets.
Diffstat (limited to 'src/channel/app.rs')
-rw-r--r--src/channel/app.rs63
1 files changed, 34 insertions, 29 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs
index 4aa2622..adefa3e 100644
--- a/src/channel/app.rs
+++ b/src/channel/app.rs
@@ -1,5 +1,5 @@
use std::collections::{hash_map::Entry, HashMap};
-use std::sync::{Arc, Mutex};
+use std::sync::{Arc, Mutex, MutexGuard};
use futures::{
stream::{self, StreamExt as _, TryStreamExt as _},
@@ -25,7 +25,7 @@ pub struct Channels<'a> {
}
impl<'a> Channels<'a> {
- pub fn new(db: &'a SqlitePool, broadcaster: &'a Broadcaster) -> Self {
+ pub const fn new(db: &'a SqlitePool, broadcaster: &'a Broadcaster) -> Self {
Self { db, broadcaster }
}
@@ -53,7 +53,7 @@ impl<'a> Channels<'a> {
let message = Message::from_login(login, message)?;
tx.commit().await?;
- self.broadcaster.broadcast(channel, message)?;
+ self.broadcaster.broadcast(channel, message);
Ok(())
}
@@ -61,7 +61,7 @@ impl<'a> Channels<'a> {
&self,
channel: &ChannelId,
) -> Result<impl Stream<Item = Result<Message, BoxedError>>, BoxedError> {
- let live_messages = self.broadcaster.listen(channel)?.map_err(BoxedError::from);
+ let live_messages = self.broadcaster.listen(channel).map_err(BoxedError::from);
let db = self.db.clone();
let mut tx = self.db.begin().await?;
@@ -159,13 +159,13 @@ pub struct Broadcaster {
}
impl Broadcaster {
- pub async fn from_database(db: &SqlitePool) -> Result<Broadcaster, BoxedError> {
+ pub async fn from_database(db: &SqlitePool) -> Result<Self, BoxedError> {
let mut tx = db.begin().await?;
let channels = tx.channels().all().await?;
tx.commit().await?;
let channels = channels.iter().map(|c| &c.id);
- let broadcaster = Broadcaster::new(channels);
+ let broadcaster = Self::new(channels);
Ok(broadcaster)
}
@@ -182,34 +182,45 @@ impl Broadcaster {
}
pub fn register_channel(&self, channel: &ChannelId) -> Result<(), RegisterError> {
- match self.senders.lock().unwrap().entry(channel.clone()) {
- Entry::Occupied(_) => Err(RegisterError::Duplicate),
- vacant => {
- vacant.or_insert_with(Self::make_sender);
- Ok(())
+ match self.senders().entry(channel.clone()) {
+ // This ever happening indicates a serious logic error.
+ Entry::Occupied(_) => return Err(RegisterError::Duplicate(channel.clone())),
+ Entry::Vacant(entry) => {
+ entry.insert(Self::make_sender());
}
}
+
+ Ok(())
}
- pub fn broadcast(&self, channel: &ChannelId, message: Message) -> Result<(), BroadcastError> {
- let lock = self.senders.lock().unwrap();
- let tx = lock.get(channel).ok_or(BroadcastError::Unregistered)?;
+ // panic: if ``channel`` has not been previously registered, and was not
+ // part of the initial set of channels.
+ pub fn broadcast(&self, channel: &ChannelId, message: Message) {
+ let tx = self.sender(channel);
// Per the Tokio docs, the returned error is only used to indicate that
// there are no receivers. In this use case, that's fine; a lack of
// listening consumers (chat clients) when a message is sent isn't an
// error.
let _ = tx.send(message);
- Ok(())
}
- pub fn listen(&self, channel: &ChannelId) -> Result<BroadcastStream<Message>, BroadcastError> {
- let lock = self.senders.lock().unwrap();
- let tx = lock.get(channel).ok_or(BroadcastError::Unregistered)?;
- let rx = tx.subscribe();
- let stream = BroadcastStream::from(rx);
+ // panic: if ``channel`` has not been previously registered, and was not
+ // part of the initial set of channels.
+ pub fn listen(&self, channel: &ChannelId) -> BroadcastStream<Message> {
+ let rx = self.sender(channel).subscribe();
+
+ BroadcastStream::from(rx)
+ }
+
+ // panic: if ``channel`` has not been previously registered, and was not
+ // part of the initial set of channels.
+ fn sender(&self, channel: &ChannelId) -> Sender<Message> {
+ self.senders()[channel].clone()
+ }
- Ok(stream)
+ fn senders(&self) -> MutexGuard<HashMap<ChannelId, Sender<Message>>> {
+ self.senders.lock().unwrap() // propagate panics when mutex is poisoned
}
fn make_sender() -> Sender<Message> {
@@ -222,12 +233,6 @@ impl Broadcaster {
#[derive(Debug, thiserror::Error)]
pub enum RegisterError {
- #[error("duplicate channel registered")]
- Duplicate,
-}
-
-#[derive(Debug, thiserror::Error)]
-pub enum BroadcastError {
- #[error("requested channel not registered")]
- Unregistered,
+ #[error("duplicate broadcast registration for channel {0}")]
+ Duplicate(ChannelId),
}