diff options
| author | Owen Jacobson <owen@grimoire.ca> | 2024-09-13 01:26:56 -0400 |
|---|---|---|
| committer | Owen Jacobson <owen@grimoire.ca> | 2024-09-13 02:42:27 -0400 |
| commit | 3193a30ebcf6bafdeaf463eda0e7e82082dfe4b5 (patch) | |
| tree | 2fb3ea84923aecf0ec1f820408bdc670b1247c95 /src | |
| parent | 067e3da1900d052a416c56e1c047640aa23441ae (diff) | |
Embed the sender's whole login (id and name) in messages, drop the redundant channel ID.
Diffstat (limited to 'src')
| -rw-r--r-- | src/channel/app.rs | 89 | ||||
| -rw-r--r-- | src/channel/repo/messages.rs | 10 | ||||
| -rw-r--r-- | src/channel/routes.rs | 12 | ||||
| -rw-r--r-- | src/login/repo/logins.rs | 28 |
4 files changed, 123 insertions, 16 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs index 7b02300..4aa2622 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -11,9 +11,13 @@ use tokio_stream::wrappers::BroadcastStream; use super::repo::{ channels::{Id as ChannelId, Provider as _}, - messages::{Message, Provider as _}, + messages::{Id as MessageId, Message as StoredMessage, Provider as _}, +}; +use crate::{ + clock::DateTime, + error::BoxedError, + login::repo::logins::{Id as LoginId, Login, Provider as _}, }; -use crate::{clock::DateTime, error::BoxedError, login::repo::logins::Login}; pub struct Channels<'a> { db: &'a SqlitePool, @@ -46,6 +50,7 @@ impl<'a> Channels<'a> { .messages() .create(&login.id, channel, body, sent_at) .await?; + let message = Message::from_login(login, message)?; tx.commit().await?; self.broadcaster.broadcast(channel, message)?; @@ -58,16 +63,92 @@ impl<'a> Channels<'a> { ) -> Result<impl Stream<Item = Result<Message, BoxedError>>, BoxedError> { let live_messages = self.broadcaster.listen(channel)?.map_err(BoxedError::from); + let db = self.db.clone(); let mut tx = self.db.begin().await?; let stored_messages = tx.messages().all(channel).await?; + let stored_messages = stream::iter(stored_messages).then(move |msg| { + // The exact series of moves and clones here is the result of trial + // and error, and is likely the best I can do, given: + // + // * This closure _can't_ keep a reference to self, for lifetime + // reasons; + // * The closure will be executed multiple times, so it can't give + // up `db`; and + // * The returned future can't keep a reference to `db` as doing + // so would allow refs to the closure's `db` to outlive the + // closure itself. + // + // Fortunately, cloning the pool is acceptable - sqlx pools were + // designed to be cloned and the only thing actually cloned is a + // single `Arc`. This whole chain of clones just ends up producing + // cheap handles to a single underlying "real" pool. + let db = db.clone(); + async move { + let mut tx = db.begin().await?; + let msg = Message::from_stored(&mut tx, msg).await?; + tx.commit().await?; + + Ok(msg) + } + }); tx.commit().await?; - let stored_messages = stream::iter(stored_messages.into_iter().map(Ok)); - Ok(stored_messages.chain(live_messages)) } } +#[derive(Clone, Debug, serde::Serialize)] +pub struct Message { + pub id: MessageId, + pub sender: Login, + pub body: String, + pub sent_at: DateTime, +} + +impl Message { + async fn from_stored( + tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>, + message: StoredMessage, + ) -> Result<Self, BoxedError> { + let sender = tx.logins().by_id(&message.sender).await?; + + let message = Self { + sender, + id: message.id, + body: message.body, + sent_at: message.sent_at, + }; + + Ok(message) + } + + fn from_login(sender: &Login, message: StoredMessage) -> Result<Self, MessageError> { + if sender.id != message.sender { + // This functionally can't happen, but the funny thing about "This + // can never happen" comments is that they're usually wrong. + return Err(MessageError::LoginMismatched { + sender: sender.id.clone(), + message: message.sender, + }); + } + + let message = Self { + sender: sender.clone(), + id: message.id, + body: message.body, + sent_at: message.sent_at, + }; + + Ok(message) + } +} + +#[derive(Debug, thiserror::Error)] +enum MessageError { + #[error("sender login id {sender} did not match message login id {message}")] + LoginMismatched { sender: LoginId, message: LoginId }, +} + // Clones will share the same senders collection. #[derive(Clone)] pub struct Broadcaster { diff --git a/src/channel/repo/messages.rs b/src/channel/repo/messages.rs index bdb0d29..0d74ea9 100644 --- a/src/channel/repo/messages.rs +++ b/src/channel/repo/messages.rs @@ -19,11 +19,13 @@ impl<'c> Provider for Transaction<'c, Sqlite> { pub struct Messages<'t>(&'t mut SqliteConnection); -#[derive(Clone, Debug, serde::Serialize)] +#[derive(Clone, Debug)] pub struct Message { pub id: Id, pub sender: LoginId, - pub channel: ChannelId, + // Field not actually used at this time, but you can reinstate it if you + // need to. It's not omitted out of any greater design intention. + // pub channel: ChannelId, pub body: String, pub sent_at: DateTime, } @@ -47,7 +49,7 @@ impl<'c> Messages<'c> { returning id as "id: Id", sender as "sender: LoginId", - channel as "channel: ChannelId", + -- channel as "channel: ChannelId", body, sent_at as "sent_at: DateTime" "#, @@ -70,7 +72,7 @@ impl<'c> Messages<'c> { select id as "id: Id", sender as "sender: LoginId", - channel as "channel: ChannelId", + -- channel as "channel: ChannelId", body, sent_at as "sent_at: DateTime" from message diff --git a/src/channel/routes.rs b/src/channel/routes.rs index 83c733c..0f95c69 100644 --- a/src/channel/routes.rs +++ b/src/channel/routes.rs @@ -8,10 +8,13 @@ use axum::{ routing::{get, post}, Router, }; -use futures::stream::{StreamExt as _, TryStreamExt as _}; +use futures::{future, stream::TryStreamExt as _}; use super::repo::channels::Id as ChannelId; -use crate::{app::App, clock::RequestedAt, error::InternalError, login::repo::logins::Login}; +use crate::{ + app::App, clock::RequestedAt, error::BoxedError, error::InternalError, + login::repo::logins::Login, +}; pub fn router() -> Router<App> { Router::new() @@ -63,10 +66,7 @@ async fn on_events( .channels() .events(&channel) .await? - .map(|msg| match msg { - Ok(msg) => Ok(serde_json::to_string(&msg)?), - Err(err) => Err(err), - }) + .and_then(|msg| future::ready(serde_json::to_string(&msg).map_err(BoxedError::from))) .map_ok(|msg| sse::Event::default().data(&msg)); Ok(Sse::new(stream).keep_alive(sse::KeepAlive::default())) diff --git a/src/login/repo/logins.rs b/src/login/repo/logins.rs index 142d8fb..e1c5057 100644 --- a/src/login/repo/logins.rs +++ b/src/login/repo/logins.rs @@ -18,7 +18,7 @@ pub struct Logins<'t>(&'t mut SqliteConnection); // This also implements FromRequestParts (see `src/login/extract/login.rs`). As // a result, it can be used as an extractor. -#[derive(Debug)] +#[derive(Clone, Debug, serde::Serialize)] pub struct Login { pub id: Id, pub name: String, @@ -55,6 +55,24 @@ impl<'c> Logins<'c> { Ok(login) } + pub async fn by_id(&mut self, id: &Id) -> Result<Login, BoxedError> { + let login = sqlx::query_as!( + Login, + r#" + select + id as "id: Id", + name + from login + where id = $1 + "#, + id, + ) + .fetch_one(&mut *self.0) + .await?; + + Ok(login) + } + /// Retrieves a login by name, plus its stored password hash for /// verification. If there's no login with the requested name, this will /// return [None]. @@ -90,7 +108,7 @@ impl<'c> Logins<'c> { } /// Stable identifier for a [Login]. Prefixed with `L`. -#[derive(Clone, Debug, sqlx::Type, serde::Serialize)] +#[derive(Clone, Debug, Eq, PartialEq, sqlx::Type, serde::Serialize)] #[sqlx(transparent)] pub struct Id(BaseId); @@ -105,3 +123,9 @@ impl Id { BaseId::generate("L") } } + +impl std::fmt::Display for Id { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.0.fmt(f) + } +} |
