From 43a20d43b09876082e54b550087f166aabdab82d Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Fri, 13 Sep 2024 01:46:53 -0400 Subject: Suggested fixes from Clippy, via nursery and pedantic sets. --- src/app.rs | 6 ++-- src/channel/app.rs | 63 ++++++++++++++++++++----------------- src/clock.rs | 4 +-- src/error.rs | 4 +-- src/id.rs | 4 +-- src/index/app.rs | 2 +- src/login/app.rs | 2 +- src/login/extract/identity_token.rs | 8 ++--- 8 files changed, 49 insertions(+), 44 deletions(-) diff --git a/src/app.rs b/src/app.rs index f349fd4..7a43be9 100644 --- a/src/app.rs +++ b/src/app.rs @@ -22,15 +22,15 @@ impl App { } impl App { - pub fn index(&self) -> Index { + pub const fn index(&self) -> Index { Index::new(&self.db) } - pub fn logins(&self) -> Logins { + pub const fn logins(&self) -> Logins { Logins::new(&self.db) } - pub fn channels(&self) -> Channels { + pub const fn channels(&self) -> Channels { Channels::new(&self.db, &self.broadcaster) } } diff --git a/src/channel/app.rs b/src/channel/app.rs index 4aa2622..adefa3e 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -1,5 +1,5 @@ use std::collections::{hash_map::Entry, HashMap}; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, MutexGuard}; use futures::{ stream::{self, StreamExt as _, TryStreamExt as _}, @@ -25,7 +25,7 @@ pub struct Channels<'a> { } impl<'a> Channels<'a> { - pub fn new(db: &'a SqlitePool, broadcaster: &'a Broadcaster) -> Self { + pub const fn new(db: &'a SqlitePool, broadcaster: &'a Broadcaster) -> Self { Self { db, broadcaster } } @@ -53,7 +53,7 @@ impl<'a> Channels<'a> { let message = Message::from_login(login, message)?; tx.commit().await?; - self.broadcaster.broadcast(channel, message)?; + self.broadcaster.broadcast(channel, message); Ok(()) } @@ -61,7 +61,7 @@ impl<'a> Channels<'a> { &self, channel: &ChannelId, ) -> Result>, BoxedError> { - let live_messages = self.broadcaster.listen(channel)?.map_err(BoxedError::from); + let live_messages = self.broadcaster.listen(channel).map_err(BoxedError::from); let db = self.db.clone(); let mut tx = self.db.begin().await?; @@ -159,13 +159,13 @@ pub struct Broadcaster { } impl Broadcaster { - pub async fn from_database(db: &SqlitePool) -> Result { + pub async fn from_database(db: &SqlitePool) -> Result { let mut tx = db.begin().await?; let channels = tx.channels().all().await?; tx.commit().await?; let channels = channels.iter().map(|c| &c.id); - let broadcaster = Broadcaster::new(channels); + let broadcaster = Self::new(channels); Ok(broadcaster) } @@ -182,34 +182,45 @@ impl Broadcaster { } pub fn register_channel(&self, channel: &ChannelId) -> Result<(), RegisterError> { - match self.senders.lock().unwrap().entry(channel.clone()) { - Entry::Occupied(_) => Err(RegisterError::Duplicate), - vacant => { - vacant.or_insert_with(Self::make_sender); - Ok(()) + match self.senders().entry(channel.clone()) { + // This ever happening indicates a serious logic error. + Entry::Occupied(_) => return Err(RegisterError::Duplicate(channel.clone())), + Entry::Vacant(entry) => { + entry.insert(Self::make_sender()); } } + + Ok(()) } - pub fn broadcast(&self, channel: &ChannelId, message: Message) -> Result<(), BroadcastError> { - let lock = self.senders.lock().unwrap(); - let tx = lock.get(channel).ok_or(BroadcastError::Unregistered)?; + // panic: if ``channel`` has not been previously registered, and was not + // part of the initial set of channels. + pub fn broadcast(&self, channel: &ChannelId, message: Message) { + let tx = self.sender(channel); // Per the Tokio docs, the returned error is only used to indicate that // there are no receivers. In this use case, that's fine; a lack of // listening consumers (chat clients) when a message is sent isn't an // error. let _ = tx.send(message); - Ok(()) } - pub fn listen(&self, channel: &ChannelId) -> Result, BroadcastError> { - let lock = self.senders.lock().unwrap(); - let tx = lock.get(channel).ok_or(BroadcastError::Unregistered)?; - let rx = tx.subscribe(); - let stream = BroadcastStream::from(rx); + // panic: if ``channel`` has not been previously registered, and was not + // part of the initial set of channels. + pub fn listen(&self, channel: &ChannelId) -> BroadcastStream { + let rx = self.sender(channel).subscribe(); + + BroadcastStream::from(rx) + } + + // panic: if ``channel`` has not been previously registered, and was not + // part of the initial set of channels. + fn sender(&self, channel: &ChannelId) -> Sender { + self.senders()[channel].clone() + } - Ok(stream) + fn senders(&self) -> MutexGuard>> { + self.senders.lock().unwrap() // propagate panics when mutex is poisoned } fn make_sender() -> Sender { @@ -222,12 +233,6 @@ impl Broadcaster { #[derive(Debug, thiserror::Error)] pub enum RegisterError { - #[error("duplicate channel registered")] - Duplicate, -} - -#[derive(Debug, thiserror::Error)] -pub enum BroadcastError { - #[error("requested channel not registered")] - Unregistered, + #[error("duplicate broadcast registration for channel {0}")] + Duplicate(ChannelId), } diff --git a/src/clock.rs b/src/clock.rs index 7757925..f7e728f 100644 --- a/src/clock.rs +++ b/src/clock.rs @@ -26,14 +26,14 @@ impl FromRequestParts for RequestedAt where S: Send + Sync, { - type Rejection = as FromRequestParts>::Rejection; + type Rejection = as FromRequestParts>::Rejection; async fn from_request_parts(parts: &mut Parts, state: &S) -> Result { // This is purely for ergonomics: it allows `RequestedAt` to be extracted // without having to wrap it in `Extension<>`. Callers _can_ still do that, // but they aren't forced to. let Extension(requested_at) = - Extension::::from_request_parts(parts, state).await?; + Extension::::from_request_parts(parts, state).await?; Ok(requested_at) } diff --git a/src/error.rs b/src/error.rs index 2d512e6..b700eaa 100644 --- a/src/error.rs +++ b/src/error.rs @@ -23,10 +23,10 @@ impl From for InternalError where E: Into, { - fn from(_: E) -> InternalError { + fn from(_: E) -> Self { // At some point it may be useful for this to record the originating // error so that it can be logged… -oj - InternalError + Self } } diff --git a/src/id.rs b/src/id.rs index c69b341..ce7f13b 100644 --- a/src/id.rs +++ b/src/id.rs @@ -48,8 +48,8 @@ impl Id { .chars() .chain( (0..ID_SIZE) - .flat_map(|_| ALPHABET.choose(&mut rng)) /* usize -> &char */ - .cloned(), /* &char -> char */ + .filter_map(|_| ALPHABET.choose(&mut rng)) /* usize -> &char */ + .copied(), /* &char -> char */ ) .collect::(); T::from(Self(id)) diff --git a/src/index/app.rs b/src/index/app.rs index d6eef18..6075c6f 100644 --- a/src/index/app.rs +++ b/src/index/app.rs @@ -10,7 +10,7 @@ pub struct Index<'a> { } impl<'a> Index<'a> { - pub fn new(db: &'a SqlitePool) -> Self { + pub const fn new(db: &'a SqlitePool) -> Self { Self { db } } diff --git a/src/login/app.rs b/src/login/app.rs index 85fa024..cd65f35 100644 --- a/src/login/app.rs +++ b/src/login/app.rs @@ -14,7 +14,7 @@ pub struct Logins<'a> { } impl<'a> Logins<'a> { - pub fn new(db: &'a SqlitePool) -> Self { + pub const fn new(db: &'a SqlitePool) -> Self { Self { db } } diff --git a/src/login/extract/identity_token.rs b/src/login/extract/identity_token.rs index c322f7b..3be3f09 100644 --- a/src/login/extract/identity_token.rs +++ b/src/login/extract/identity_token.rs @@ -31,7 +31,7 @@ impl IdentityToken { .permanent() .build(); - IdentityToken { + Self { cookies: self.cookies.add(identity_cookie), } } @@ -39,7 +39,7 @@ impl IdentityToken { /// Remove the identity secret and ensure that it will be cleared when this /// extractor is included in a response. pub fn clear(self) -> Self { - IdentityToken { + Self { cookies: self.cookies.remove(IDENTITY_COOKIE), } } @@ -56,7 +56,7 @@ where async fn from_request_parts(parts: &mut Parts, state: &S) -> Result { let cookies = CookieJar::from_request_parts(parts, state).await?; - Ok(IdentityToken { cookies }) + Ok(Self { cookies }) } } @@ -64,7 +64,7 @@ impl IntoResponseParts for IdentityToken { type Error = Infallible; fn into_response_parts(self, res: ResponseParts) -> Result { - let IdentityToken { cookies } = self; + let Self { cookies } = self; cookies.into_response_parts(res) } } -- cgit v1.2.3