diff options
| author | Owen Jacobson <owen@grimoire.ca> | 2024-09-13 01:46:53 -0400 |
|---|---|---|
| committer | Owen Jacobson <owen@grimoire.ca> | 2024-09-13 02:42:27 -0400 |
| commit | 43a20d43b09876082e54b550087f166aabdab82d (patch) | |
| tree | 1c9518d45eed50b3db93a9f0df71c23f5b06f039 /src/channel/app.rs | |
| parent | 3193a30ebcf6bafdeaf463eda0e7e82082dfe4b5 (diff) | |
Suggested fixes from Clippy, via nursery and pedantic sets.
Diffstat (limited to 'src/channel/app.rs')
| -rw-r--r-- | src/channel/app.rs | 63 |
1 files changed, 34 insertions, 29 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs index 4aa2622..adefa3e 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -1,5 +1,5 @@ use std::collections::{hash_map::Entry, HashMap}; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, MutexGuard}; use futures::{ stream::{self, StreamExt as _, TryStreamExt as _}, @@ -25,7 +25,7 @@ pub struct Channels<'a> { } impl<'a> Channels<'a> { - pub fn new(db: &'a SqlitePool, broadcaster: &'a Broadcaster) -> Self { + pub const fn new(db: &'a SqlitePool, broadcaster: &'a Broadcaster) -> Self { Self { db, broadcaster } } @@ -53,7 +53,7 @@ impl<'a> Channels<'a> { let message = Message::from_login(login, message)?; tx.commit().await?; - self.broadcaster.broadcast(channel, message)?; + self.broadcaster.broadcast(channel, message); Ok(()) } @@ -61,7 +61,7 @@ impl<'a> Channels<'a> { &self, channel: &ChannelId, ) -> Result<impl Stream<Item = Result<Message, BoxedError>>, BoxedError> { - let live_messages = self.broadcaster.listen(channel)?.map_err(BoxedError::from); + let live_messages = self.broadcaster.listen(channel).map_err(BoxedError::from); let db = self.db.clone(); let mut tx = self.db.begin().await?; @@ -159,13 +159,13 @@ pub struct Broadcaster { } impl Broadcaster { - pub async fn from_database(db: &SqlitePool) -> Result<Broadcaster, BoxedError> { + pub async fn from_database(db: &SqlitePool) -> Result<Self, BoxedError> { let mut tx = db.begin().await?; let channels = tx.channels().all().await?; tx.commit().await?; let channels = channels.iter().map(|c| &c.id); - let broadcaster = Broadcaster::new(channels); + let broadcaster = Self::new(channels); Ok(broadcaster) } @@ -182,34 +182,45 @@ impl Broadcaster { } pub fn register_channel(&self, channel: &ChannelId) -> Result<(), RegisterError> { - match self.senders.lock().unwrap().entry(channel.clone()) { - Entry::Occupied(_) => Err(RegisterError::Duplicate), - vacant => { - vacant.or_insert_with(Self::make_sender); - Ok(()) + match self.senders().entry(channel.clone()) { + // This ever happening indicates a serious logic error. + Entry::Occupied(_) => return Err(RegisterError::Duplicate(channel.clone())), + Entry::Vacant(entry) => { + entry.insert(Self::make_sender()); } } + + Ok(()) } - pub fn broadcast(&self, channel: &ChannelId, message: Message) -> Result<(), BroadcastError> { - let lock = self.senders.lock().unwrap(); - let tx = lock.get(channel).ok_or(BroadcastError::Unregistered)?; + // panic: if ``channel`` has not been previously registered, and was not + // part of the initial set of channels. + pub fn broadcast(&self, channel: &ChannelId, message: Message) { + let tx = self.sender(channel); // Per the Tokio docs, the returned error is only used to indicate that // there are no receivers. In this use case, that's fine; a lack of // listening consumers (chat clients) when a message is sent isn't an // error. let _ = tx.send(message); - Ok(()) } - pub fn listen(&self, channel: &ChannelId) -> Result<BroadcastStream<Message>, BroadcastError> { - let lock = self.senders.lock().unwrap(); - let tx = lock.get(channel).ok_or(BroadcastError::Unregistered)?; - let rx = tx.subscribe(); - let stream = BroadcastStream::from(rx); + // panic: if ``channel`` has not been previously registered, and was not + // part of the initial set of channels. + pub fn listen(&self, channel: &ChannelId) -> BroadcastStream<Message> { + let rx = self.sender(channel).subscribe(); + + BroadcastStream::from(rx) + } + + // panic: if ``channel`` has not been previously registered, and was not + // part of the initial set of channels. + fn sender(&self, channel: &ChannelId) -> Sender<Message> { + self.senders()[channel].clone() + } - Ok(stream) + fn senders(&self) -> MutexGuard<HashMap<ChannelId, Sender<Message>>> { + self.senders.lock().unwrap() // propagate panics when mutex is poisoned } fn make_sender() -> Sender<Message> { @@ -222,12 +233,6 @@ impl Broadcaster { #[derive(Debug, thiserror::Error)] pub enum RegisterError { - #[error("duplicate channel registered")] - Duplicate, -} - -#[derive(Debug, thiserror::Error)] -pub enum BroadcastError { - #[error("requested channel not registered")] - Unregistered, + #[error("duplicate broadcast registration for channel {0}")] + Duplicate(ChannelId), } |
