summaryrefslogtreecommitdiff
path: root/src/channel/app.rs
diff options
context:
space:
mode:
authorOwen Jacobson <owen@grimoire.ca>2024-09-13 21:19:40 -0400
committerOwen Jacobson <owen@grimoire.ca>2024-09-13 23:12:31 -0400
commit388a3d5a925aef7ff39339454ae0d720e05f038e (patch)
tree672595cd179eeca7c9e61606704e453202f9efb0 /src/channel/app.rs
parent72daa3f510ea381b7e3606c75f03ce6000dc780c (diff)
Generate the required structure for broadcasting from a join, not from O(n) queries.
Diffstat (limited to 'src/channel/app.rs')
-rw-r--r--src/channel/app.rs97
1 files changed, 11 insertions, 86 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs
index c060b23..e242c2f 100644
--- a/src/channel/app.rs
+++ b/src/channel/app.rs
@@ -11,13 +11,9 @@ use tokio_stream::wrappers::BroadcastStream;
use super::repo::{
channels::{Id as ChannelId, Provider as _},
- messages::{Id as MessageId, Message as StoredMessage, Provider as _},
-};
-use crate::{
- clock::DateTime,
- error::BoxedError,
- login::repo::logins::{Login, Provider as _},
+ messages::{BroadcastMessage, Provider as _},
};
+use crate::{clock::DateTime, error::BoxedError, login::repo::logins::Login};
pub struct Channels<'a> {
db: &'a SqlitePool,
@@ -50,7 +46,6 @@ impl<'a> Channels<'a> {
.messages()
.create(&login.id, channel, body, sent_at)
.await?;
- let message = Message::from_login(login, message);
tx.commit().await?;
self.broadcaster.broadcast(channel, message);
@@ -60,84 +55,14 @@ impl<'a> Channels<'a> {
pub async fn events(
&self,
channel: &ChannelId,
- ) -> Result<impl Stream<Item = Result<Message, BoxedError>>, BoxedError> {
+ ) -> Result<impl Stream<Item = Result<BroadcastMessage, BoxedError>>, BoxedError> {
let live_messages = self.broadcaster.listen(channel).map_err(BoxedError::from);
- let db = self.db.clone();
let mut tx = self.db.begin().await?;
- let stored_messages = tx.messages().all(channel).await?;
- let stored_messages = stream::iter(stored_messages).then(move |msg| {
- // The exact series of moves and clones here is the result of trial
- // and error, and is likely the best I can do, given:
- //
- // * This closure _can't_ keep a reference to self, for lifetime
- // reasons;
- // * The closure will be executed multiple times, so it can't give
- // up `db`; and
- // * The returned future can't keep a reference to `db` as doing
- // so would allow refs to the closure's `db` to outlive the
- // closure itself.
- //
- // Fortunately, cloning the pool is acceptable - sqlx pools were
- // designed to be cloned and the only thing actually cloned is a
- // single `Arc`. This whole chain of clones just ends up producing
- // cheap handles to a single underlying "real" pool.
- let db = db.clone();
- async move {
- let mut tx = db.begin().await?;
- let msg = Message::from_stored(&mut tx, msg).await?;
- tx.commit().await?;
-
- Ok(msg)
- }
- });
+ let stored_messages = tx.messages().for_replay(channel).await?;
tx.commit().await?;
- Ok(stored_messages.chain(live_messages))
- }
-}
-
-#[derive(Clone, Debug, serde::Serialize)]
-pub struct Message {
- pub id: MessageId,
- pub sender: Login,
- pub body: String,
- pub sent_at: DateTime,
-}
-
-impl Message {
- async fn from_stored(
- tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>,
- message: StoredMessage,
- ) -> Result<Self, BoxedError> {
- let sender = tx.logins().by_id(&message.sender).await?;
-
- let message = Self {
- sender,
- id: message.id,
- body: message.body,
- sent_at: message.sent_at,
- };
-
- Ok(message)
- }
-
- fn from_login(sender: &Login, message: StoredMessage) -> Self {
- // Panic as this logic is enforced by the caller anyways. This "can't
- // happen," other than via programming mistakes, and cannot be fixed
- // by config changes or changing user behaviours.
- assert_eq!(
- message.sender, sender.id,
- "broadcast message must have the same sender ({}) as the stored message ({})",
- sender.id, message.sender,
- );
-
- Self {
- sender: sender.clone(),
- id: message.id,
- body: message.body,
- sent_at: message.sent_at,
- }
+ Ok(stream::iter(stored_messages).map(Ok).chain(live_messages))
}
}
@@ -147,7 +72,7 @@ pub struct Broadcaster {
// The use of std::sync::Mutex, and not tokio::sync::Mutex, follows Tokio's
// own advice: <https://tokio.rs/tokio/tutorial/shared-state>. Methods that
// lock it must be sync.
- senders: Arc<Mutex<HashMap<ChannelId, Sender<Message>>>>,
+ senders: Arc<Mutex<HashMap<ChannelId, Sender<BroadcastMessage>>>>,
}
impl Broadcaster {
@@ -186,7 +111,7 @@ impl Broadcaster {
// panic: if ``channel`` has not been previously registered, and was not
// part of the initial set of channels.
- pub fn broadcast(&self, channel: &ChannelId, message: Message) {
+ pub fn broadcast(&self, channel: &ChannelId, message: BroadcastMessage) {
let tx = self.sender(channel);
// Per the Tokio docs, the returned error is only used to indicate that
@@ -198,7 +123,7 @@ impl Broadcaster {
// panic: if ``channel`` has not been previously registered, and was not
// part of the initial set of channels.
- pub fn listen(&self, channel: &ChannelId) -> BroadcastStream<Message> {
+ pub fn listen(&self, channel: &ChannelId) -> BroadcastStream<BroadcastMessage> {
let rx = self.sender(channel).subscribe();
BroadcastStream::from(rx)
@@ -206,15 +131,15 @@ impl Broadcaster {
// panic: if ``channel`` has not been previously registered, and was not
// part of the initial set of channels.
- fn sender(&self, channel: &ChannelId) -> Sender<Message> {
+ fn sender(&self, channel: &ChannelId) -> Sender<BroadcastMessage> {
self.senders()[channel].clone()
}
- fn senders(&self) -> MutexGuard<HashMap<ChannelId, Sender<Message>>> {
+ fn senders(&self) -> MutexGuard<HashMap<ChannelId, Sender<BroadcastMessage>>> {
self.senders.lock().unwrap() // propagate panics when mutex is poisoned
}
- fn make_sender() -> Sender<Message> {
+ fn make_sender() -> Sender<BroadcastMessage> {
// Queue depth of 16 chosen entirely arbitrarily. Don't read too much
// into it.
let (tx, _) = channel(16);