diff options
| author | Owen Jacobson <owen@grimoire.ca> | 2024-09-13 21:19:40 -0400 |
|---|---|---|
| committer | Owen Jacobson <owen@grimoire.ca> | 2024-09-13 23:12:31 -0400 |
| commit | 388a3d5a925aef7ff39339454ae0d720e05f038e (patch) | |
| tree | 672595cd179eeca7c9e61606704e453202f9efb0 /src/channel/app.rs | |
| parent | 72daa3f510ea381b7e3606c75f03ce6000dc780c (diff) | |
Generate the required structure for broadcasting from a join, not from O(n) queries.
Diffstat (limited to 'src/channel/app.rs')
| -rw-r--r-- | src/channel/app.rs | 97 |
1 files changed, 11 insertions, 86 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs index c060b23..e242c2f 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -11,13 +11,9 @@ use tokio_stream::wrappers::BroadcastStream; use super::repo::{ channels::{Id as ChannelId, Provider as _}, - messages::{Id as MessageId, Message as StoredMessage, Provider as _}, -}; -use crate::{ - clock::DateTime, - error::BoxedError, - login::repo::logins::{Login, Provider as _}, + messages::{BroadcastMessage, Provider as _}, }; +use crate::{clock::DateTime, error::BoxedError, login::repo::logins::Login}; pub struct Channels<'a> { db: &'a SqlitePool, @@ -50,7 +46,6 @@ impl<'a> Channels<'a> { .messages() .create(&login.id, channel, body, sent_at) .await?; - let message = Message::from_login(login, message); tx.commit().await?; self.broadcaster.broadcast(channel, message); @@ -60,84 +55,14 @@ impl<'a> Channels<'a> { pub async fn events( &self, channel: &ChannelId, - ) -> Result<impl Stream<Item = Result<Message, BoxedError>>, BoxedError> { + ) -> Result<impl Stream<Item = Result<BroadcastMessage, BoxedError>>, BoxedError> { let live_messages = self.broadcaster.listen(channel).map_err(BoxedError::from); - let db = self.db.clone(); let mut tx = self.db.begin().await?; - let stored_messages = tx.messages().all(channel).await?; - let stored_messages = stream::iter(stored_messages).then(move |msg| { - // The exact series of moves and clones here is the result of trial - // and error, and is likely the best I can do, given: - // - // * This closure _can't_ keep a reference to self, for lifetime - // reasons; - // * The closure will be executed multiple times, so it can't give - // up `db`; and - // * The returned future can't keep a reference to `db` as doing - // so would allow refs to the closure's `db` to outlive the - // closure itself. - // - // Fortunately, cloning the pool is acceptable - sqlx pools were - // designed to be cloned and the only thing actually cloned is a - // single `Arc`. This whole chain of clones just ends up producing - // cheap handles to a single underlying "real" pool. - let db = db.clone(); - async move { - let mut tx = db.begin().await?; - let msg = Message::from_stored(&mut tx, msg).await?; - tx.commit().await?; - - Ok(msg) - } - }); + let stored_messages = tx.messages().for_replay(channel).await?; tx.commit().await?; - Ok(stored_messages.chain(live_messages)) - } -} - -#[derive(Clone, Debug, serde::Serialize)] -pub struct Message { - pub id: MessageId, - pub sender: Login, - pub body: String, - pub sent_at: DateTime, -} - -impl Message { - async fn from_stored( - tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>, - message: StoredMessage, - ) -> Result<Self, BoxedError> { - let sender = tx.logins().by_id(&message.sender).await?; - - let message = Self { - sender, - id: message.id, - body: message.body, - sent_at: message.sent_at, - }; - - Ok(message) - } - - fn from_login(sender: &Login, message: StoredMessage) -> Self { - // Panic as this logic is enforced by the caller anyways. This "can't - // happen," other than via programming mistakes, and cannot be fixed - // by config changes or changing user behaviours. - assert_eq!( - message.sender, sender.id, - "broadcast message must have the same sender ({}) as the stored message ({})", - sender.id, message.sender, - ); - - Self { - sender: sender.clone(), - id: message.id, - body: message.body, - sent_at: message.sent_at, - } + Ok(stream::iter(stored_messages).map(Ok).chain(live_messages)) } } @@ -147,7 +72,7 @@ pub struct Broadcaster { // The use of std::sync::Mutex, and not tokio::sync::Mutex, follows Tokio's // own advice: <https://tokio.rs/tokio/tutorial/shared-state>. Methods that // lock it must be sync. - senders: Arc<Mutex<HashMap<ChannelId, Sender<Message>>>>, + senders: Arc<Mutex<HashMap<ChannelId, Sender<BroadcastMessage>>>>, } impl Broadcaster { @@ -186,7 +111,7 @@ impl Broadcaster { // panic: if ``channel`` has not been previously registered, and was not // part of the initial set of channels. - pub fn broadcast(&self, channel: &ChannelId, message: Message) { + pub fn broadcast(&self, channel: &ChannelId, message: BroadcastMessage) { let tx = self.sender(channel); // Per the Tokio docs, the returned error is only used to indicate that @@ -198,7 +123,7 @@ impl Broadcaster { // panic: if ``channel`` has not been previously registered, and was not // part of the initial set of channels. - pub fn listen(&self, channel: &ChannelId) -> BroadcastStream<Message> { + pub fn listen(&self, channel: &ChannelId) -> BroadcastStream<BroadcastMessage> { let rx = self.sender(channel).subscribe(); BroadcastStream::from(rx) @@ -206,15 +131,15 @@ impl Broadcaster { // panic: if ``channel`` has not been previously registered, and was not // part of the initial set of channels. - fn sender(&self, channel: &ChannelId) -> Sender<Message> { + fn sender(&self, channel: &ChannelId) -> Sender<BroadcastMessage> { self.senders()[channel].clone() } - fn senders(&self) -> MutexGuard<HashMap<ChannelId, Sender<Message>>> { + fn senders(&self) -> MutexGuard<HashMap<ChannelId, Sender<BroadcastMessage>>> { self.senders.lock().unwrap() // propagate panics when mutex is poisoned } - fn make_sender() -> Sender<Message> { + fn make_sender() -> Sender<BroadcastMessage> { // Queue depth of 16 chosen entirely arbitrarily. Don't read too much // into it. let (tx, _) = channel(16); |
