summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/app.rs6
-rw-r--r--src/channel/app.rs63
-rw-r--r--src/clock.rs4
-rw-r--r--src/error.rs4
-rw-r--r--src/id.rs4
-rw-r--r--src/index/app.rs2
-rw-r--r--src/login/app.rs2
-rw-r--r--src/login/extract/identity_token.rs8
8 files changed, 49 insertions, 44 deletions
diff --git a/src/app.rs b/src/app.rs
index f349fd4..7a43be9 100644
--- a/src/app.rs
+++ b/src/app.rs
@@ -22,15 +22,15 @@ impl App {
}
impl App {
- pub fn index(&self) -> Index {
+ pub const fn index(&self) -> Index {
Index::new(&self.db)
}
- pub fn logins(&self) -> Logins {
+ pub const fn logins(&self) -> Logins {
Logins::new(&self.db)
}
- pub fn channels(&self) -> Channels {
+ pub const fn channels(&self) -> Channels {
Channels::new(&self.db, &self.broadcaster)
}
}
diff --git a/src/channel/app.rs b/src/channel/app.rs
index 4aa2622..adefa3e 100644
--- a/src/channel/app.rs
+++ b/src/channel/app.rs
@@ -1,5 +1,5 @@
use std::collections::{hash_map::Entry, HashMap};
-use std::sync::{Arc, Mutex};
+use std::sync::{Arc, Mutex, MutexGuard};
use futures::{
stream::{self, StreamExt as _, TryStreamExt as _},
@@ -25,7 +25,7 @@ pub struct Channels<'a> {
}
impl<'a> Channels<'a> {
- pub fn new(db: &'a SqlitePool, broadcaster: &'a Broadcaster) -> Self {
+ pub const fn new(db: &'a SqlitePool, broadcaster: &'a Broadcaster) -> Self {
Self { db, broadcaster }
}
@@ -53,7 +53,7 @@ impl<'a> Channels<'a> {
let message = Message::from_login(login, message)?;
tx.commit().await?;
- self.broadcaster.broadcast(channel, message)?;
+ self.broadcaster.broadcast(channel, message);
Ok(())
}
@@ -61,7 +61,7 @@ impl<'a> Channels<'a> {
&self,
channel: &ChannelId,
) -> Result<impl Stream<Item = Result<Message, BoxedError>>, BoxedError> {
- let live_messages = self.broadcaster.listen(channel)?.map_err(BoxedError::from);
+ let live_messages = self.broadcaster.listen(channel).map_err(BoxedError::from);
let db = self.db.clone();
let mut tx = self.db.begin().await?;
@@ -159,13 +159,13 @@ pub struct Broadcaster {
}
impl Broadcaster {
- pub async fn from_database(db: &SqlitePool) -> Result<Broadcaster, BoxedError> {
+ pub async fn from_database(db: &SqlitePool) -> Result<Self, BoxedError> {
let mut tx = db.begin().await?;
let channels = tx.channels().all().await?;
tx.commit().await?;
let channels = channels.iter().map(|c| &c.id);
- let broadcaster = Broadcaster::new(channels);
+ let broadcaster = Self::new(channels);
Ok(broadcaster)
}
@@ -182,34 +182,45 @@ impl Broadcaster {
}
pub fn register_channel(&self, channel: &ChannelId) -> Result<(), RegisterError> {
- match self.senders.lock().unwrap().entry(channel.clone()) {
- Entry::Occupied(_) => Err(RegisterError::Duplicate),
- vacant => {
- vacant.or_insert_with(Self::make_sender);
- Ok(())
+ match self.senders().entry(channel.clone()) {
+ // This ever happening indicates a serious logic error.
+ Entry::Occupied(_) => return Err(RegisterError::Duplicate(channel.clone())),
+ Entry::Vacant(entry) => {
+ entry.insert(Self::make_sender());
}
}
+
+ Ok(())
}
- pub fn broadcast(&self, channel: &ChannelId, message: Message) -> Result<(), BroadcastError> {
- let lock = self.senders.lock().unwrap();
- let tx = lock.get(channel).ok_or(BroadcastError::Unregistered)?;
+ // panic: if ``channel`` has not been previously registered, and was not
+ // part of the initial set of channels.
+ pub fn broadcast(&self, channel: &ChannelId, message: Message) {
+ let tx = self.sender(channel);
// Per the Tokio docs, the returned error is only used to indicate that
// there are no receivers. In this use case, that's fine; a lack of
// listening consumers (chat clients) when a message is sent isn't an
// error.
let _ = tx.send(message);
- Ok(())
}
- pub fn listen(&self, channel: &ChannelId) -> Result<BroadcastStream<Message>, BroadcastError> {
- let lock = self.senders.lock().unwrap();
- let tx = lock.get(channel).ok_or(BroadcastError::Unregistered)?;
- let rx = tx.subscribe();
- let stream = BroadcastStream::from(rx);
+ // panic: if ``channel`` has not been previously registered, and was not
+ // part of the initial set of channels.
+ pub fn listen(&self, channel: &ChannelId) -> BroadcastStream<Message> {
+ let rx = self.sender(channel).subscribe();
+
+ BroadcastStream::from(rx)
+ }
+
+ // panic: if ``channel`` has not been previously registered, and was not
+ // part of the initial set of channels.
+ fn sender(&self, channel: &ChannelId) -> Sender<Message> {
+ self.senders()[channel].clone()
+ }
- Ok(stream)
+ fn senders(&self) -> MutexGuard<HashMap<ChannelId, Sender<Message>>> {
+ self.senders.lock().unwrap() // propagate panics when mutex is poisoned
}
fn make_sender() -> Sender<Message> {
@@ -222,12 +233,6 @@ impl Broadcaster {
#[derive(Debug, thiserror::Error)]
pub enum RegisterError {
- #[error("duplicate channel registered")]
- Duplicate,
-}
-
-#[derive(Debug, thiserror::Error)]
-pub enum BroadcastError {
- #[error("requested channel not registered")]
- Unregistered,
+ #[error("duplicate broadcast registration for channel {0}")]
+ Duplicate(ChannelId),
}
diff --git a/src/clock.rs b/src/clock.rs
index 7757925..f7e728f 100644
--- a/src/clock.rs
+++ b/src/clock.rs
@@ -26,14 +26,14 @@ impl<S> FromRequestParts<S> for RequestedAt
where
S: Send + Sync,
{
- type Rejection = <Extension<RequestedAt> as FromRequestParts<S>>::Rejection;
+ type Rejection = <Extension<Self> as FromRequestParts<S>>::Rejection;
async fn from_request_parts(parts: &mut Parts, state: &S) -> Result<Self, Self::Rejection> {
// This is purely for ergonomics: it allows `RequestedAt` to be extracted
// without having to wrap it in `Extension<>`. Callers _can_ still do that,
// but they aren't forced to.
let Extension(requested_at) =
- Extension::<RequestedAt>::from_request_parts(parts, state).await?;
+ Extension::<Self>::from_request_parts(parts, state).await?;
Ok(requested_at)
}
diff --git a/src/error.rs b/src/error.rs
index 2d512e6..b700eaa 100644
--- a/src/error.rs
+++ b/src/error.rs
@@ -23,10 +23,10 @@ impl<E> From<E> for InternalError
where
E: Into<BoxedError>,
{
- fn from(_: E) -> InternalError {
+ fn from(_: E) -> Self {
// At some point it may be useful for this to record the originating
// error so that it can be logged… -oj
- InternalError
+ Self
}
}
diff --git a/src/id.rs b/src/id.rs
index c69b341..ce7f13b 100644
--- a/src/id.rs
+++ b/src/id.rs
@@ -48,8 +48,8 @@ impl Id {
.chars()
.chain(
(0..ID_SIZE)
- .flat_map(|_| ALPHABET.choose(&mut rng)) /* usize -> &char */
- .cloned(), /* &char -> char */
+ .filter_map(|_| ALPHABET.choose(&mut rng)) /* usize -> &char */
+ .copied(), /* &char -> char */
)
.collect::<String>();
T::from(Self(id))
diff --git a/src/index/app.rs b/src/index/app.rs
index d6eef18..6075c6f 100644
--- a/src/index/app.rs
+++ b/src/index/app.rs
@@ -10,7 +10,7 @@ pub struct Index<'a> {
}
impl<'a> Index<'a> {
- pub fn new(db: &'a SqlitePool) -> Self {
+ pub const fn new(db: &'a SqlitePool) -> Self {
Self { db }
}
diff --git a/src/login/app.rs b/src/login/app.rs
index 85fa024..cd65f35 100644
--- a/src/login/app.rs
+++ b/src/login/app.rs
@@ -14,7 +14,7 @@ pub struct Logins<'a> {
}
impl<'a> Logins<'a> {
- pub fn new(db: &'a SqlitePool) -> Self {
+ pub const fn new(db: &'a SqlitePool) -> Self {
Self { db }
}
diff --git a/src/login/extract/identity_token.rs b/src/login/extract/identity_token.rs
index c322f7b..3be3f09 100644
--- a/src/login/extract/identity_token.rs
+++ b/src/login/extract/identity_token.rs
@@ -31,7 +31,7 @@ impl IdentityToken {
.permanent()
.build();
- IdentityToken {
+ Self {
cookies: self.cookies.add(identity_cookie),
}
}
@@ -39,7 +39,7 @@ impl IdentityToken {
/// Remove the identity secret and ensure that it will be cleared when this
/// extractor is included in a response.
pub fn clear(self) -> Self {
- IdentityToken {
+ Self {
cookies: self.cookies.remove(IDENTITY_COOKIE),
}
}
@@ -56,7 +56,7 @@ where
async fn from_request_parts(parts: &mut Parts, state: &S) -> Result<Self, Self::Rejection> {
let cookies = CookieJar::from_request_parts(parts, state).await?;
- Ok(IdentityToken { cookies })
+ Ok(Self { cookies })
}
}
@@ -64,7 +64,7 @@ impl IntoResponseParts for IdentityToken {
type Error = Infallible;
fn into_response_parts(self, res: ResponseParts) -> Result<ResponseParts, Self::Error> {
- let IdentityToken { cookies } = self;
+ let Self { cookies } = self;
cookies.into_response_parts(res)
}
}