diff options
| author | Owen Jacobson <owen@grimoire.ca> | 2024-09-13 01:26:56 -0400 |
|---|---|---|
| committer | Owen Jacobson <owen@grimoire.ca> | 2024-09-13 02:42:27 -0400 |
| commit | 3193a30ebcf6bafdeaf463eda0e7e82082dfe4b5 (patch) | |
| tree | 2fb3ea84923aecf0ec1f820408bdc670b1247c95 /src/channel/app.rs | |
| parent | 067e3da1900d052a416c56e1c047640aa23441ae (diff) | |
Embed the sender's whole login (id and name) in messages, drop the redundant channel ID.
Diffstat (limited to 'src/channel/app.rs')
| -rw-r--r-- | src/channel/app.rs | 89 |
1 files changed, 85 insertions, 4 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs index 7b02300..4aa2622 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -11,9 +11,13 @@ use tokio_stream::wrappers::BroadcastStream; use super::repo::{ channels::{Id as ChannelId, Provider as _}, - messages::{Message, Provider as _}, + messages::{Id as MessageId, Message as StoredMessage, Provider as _}, +}; +use crate::{ + clock::DateTime, + error::BoxedError, + login::repo::logins::{Id as LoginId, Login, Provider as _}, }; -use crate::{clock::DateTime, error::BoxedError, login::repo::logins::Login}; pub struct Channels<'a> { db: &'a SqlitePool, @@ -46,6 +50,7 @@ impl<'a> Channels<'a> { .messages() .create(&login.id, channel, body, sent_at) .await?; + let message = Message::from_login(login, message)?; tx.commit().await?; self.broadcaster.broadcast(channel, message)?; @@ -58,16 +63,92 @@ impl<'a> Channels<'a> { ) -> Result<impl Stream<Item = Result<Message, BoxedError>>, BoxedError> { let live_messages = self.broadcaster.listen(channel)?.map_err(BoxedError::from); + let db = self.db.clone(); let mut tx = self.db.begin().await?; let stored_messages = tx.messages().all(channel).await?; + let stored_messages = stream::iter(stored_messages).then(move |msg| { + // The exact series of moves and clones here is the result of trial + // and error, and is likely the best I can do, given: + // + // * This closure _can't_ keep a reference to self, for lifetime + // reasons; + // * The closure will be executed multiple times, so it can't give + // up `db`; and + // * The returned future can't keep a reference to `db` as doing + // so would allow refs to the closure's `db` to outlive the + // closure itself. + // + // Fortunately, cloning the pool is acceptable - sqlx pools were + // designed to be cloned and the only thing actually cloned is a + // single `Arc`. This whole chain of clones just ends up producing + // cheap handles to a single underlying "real" pool. + let db = db.clone(); + async move { + let mut tx = db.begin().await?; + let msg = Message::from_stored(&mut tx, msg).await?; + tx.commit().await?; + + Ok(msg) + } + }); tx.commit().await?; - let stored_messages = stream::iter(stored_messages.into_iter().map(Ok)); - Ok(stored_messages.chain(live_messages)) } } +#[derive(Clone, Debug, serde::Serialize)] +pub struct Message { + pub id: MessageId, + pub sender: Login, + pub body: String, + pub sent_at: DateTime, +} + +impl Message { + async fn from_stored( + tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>, + message: StoredMessage, + ) -> Result<Self, BoxedError> { + let sender = tx.logins().by_id(&message.sender).await?; + + let message = Self { + sender, + id: message.id, + body: message.body, + sent_at: message.sent_at, + }; + + Ok(message) + } + + fn from_login(sender: &Login, message: StoredMessage) -> Result<Self, MessageError> { + if sender.id != message.sender { + // This functionally can't happen, but the funny thing about "This + // can never happen" comments is that they're usually wrong. + return Err(MessageError::LoginMismatched { + sender: sender.id.clone(), + message: message.sender, + }); + } + + let message = Self { + sender: sender.clone(), + id: message.id, + body: message.body, + sent_at: message.sent_at, + }; + + Ok(message) + } +} + +#[derive(Debug, thiserror::Error)] +enum MessageError { + #[error("sender login id {sender} did not match message login id {message}")] + LoginMismatched { sender: LoginId, message: LoginId }, +} + // Clones will share the same senders collection. #[derive(Clone)] pub struct Broadcaster { |
