diff options
| -rw-r--r-- | .sqlx/query-43c6f8d2fba6620d5ef64ca26494cfd46365670959cd33e1fd52bca46b0fdc20.json | 38 | ||||
| -rw-r--r-- | .sqlx/query-5df9d6889d5e057c3260d4956cdb313786b458082db232919de1a5e5195df7ee.json (renamed from .sqlx/query-959ae6e1e8653e33c26ed4320cea93631841fe57e5be0a207bd24ae1dadd3bad.json) | 14 | ||||
| -rw-r--r-- | .sqlx/query-d4a496f6348550cd2c94c2235d82b01b29f864f64daf1bcb19162b1eb26df597.json | 44 | ||||
| -rw-r--r-- | src/channel/app.rs | 97 | ||||
| -rw-r--r-- | src/channel/repo/messages.rs | 59 | ||||
| -rw-r--r-- | src/login/repo/logins.rs | 6 |
6 files changed, 103 insertions, 155 deletions
diff --git a/.sqlx/query-43c6f8d2fba6620d5ef64ca26494cfd46365670959cd33e1fd52bca46b0fdc20.json b/.sqlx/query-43c6f8d2fba6620d5ef64ca26494cfd46365670959cd33e1fd52bca46b0fdc20.json deleted file mode 100644 index 6368a4a..0000000 --- a/.sqlx/query-43c6f8d2fba6620d5ef64ca26494cfd46365670959cd33e1fd52bca46b0fdc20.json +++ /dev/null @@ -1,38 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n select\n id as \"id: Id\",\n sender as \"sender: LoginId\",\n -- channel as \"channel: ChannelId\",\n body,\n sent_at as \"sent_at: DateTime\"\n from message\n where channel = $1\n order by sent_at asc\n ", - "describe": { - "columns": [ - { - "name": "id: Id", - "ordinal": 0, - "type_info": "Text" - }, - { - "name": "sender: LoginId", - "ordinal": 1, - "type_info": "Text" - }, - { - "name": "body", - "ordinal": 2, - "type_info": "Text" - }, - { - "name": "sent_at: DateTime", - "ordinal": 3, - "type_info": "Text" - } - ], - "parameters": { - "Right": 1 - }, - "nullable": [ - false, - false, - false, - false - ] - }, - "hash": "43c6f8d2fba6620d5ef64ca26494cfd46365670959cd33e1fd52bca46b0fdc20" -} diff --git a/.sqlx/query-959ae6e1e8653e33c26ed4320cea93631841fe57e5be0a207bd24ae1dadd3bad.json b/.sqlx/query-5df9d6889d5e057c3260d4956cdb313786b458082db232919de1a5e5195df7ee.json index 06c5e53..7fd7ae3 100644 --- a/.sqlx/query-959ae6e1e8653e33c26ed4320cea93631841fe57e5be0a207bd24ae1dadd3bad.json +++ b/.sqlx/query-5df9d6889d5e057c3260d4956cdb313786b458082db232919de1a5e5195df7ee.json @@ -1,6 +1,6 @@ { "db_name": "SQLite", - "query": "\n insert into message\n (id, sender, channel, body, sent_at)\n values ($1, $2, $3, $4, $5)\n returning\n id as \"id: Id\",\n sender as \"sender: LoginId\",\n -- channel as \"channel: ChannelId\",\n body,\n sent_at as \"sent_at: DateTime\"\n ", + "query": "\n insert into message\n (id, sender, channel, body, sent_at)\n values ($1, $2, $3, $4, $5)\n returning\n id as \"id: Id\",\n body,\n sent_at as \"sent_at: DateTime\"\n ", "describe": { "columns": [ { @@ -9,18 +9,13 @@ "type_info": "Text" }, { - "name": "sender: LoginId", - "ordinal": 1, - "type_info": "Text" - }, - { "name": "body", - "ordinal": 2, + "ordinal": 1, "type_info": "Text" }, { "name": "sent_at: DateTime", - "ordinal": 3, + "ordinal": 2, "type_info": "Text" } ], @@ -30,9 +25,8 @@ "nullable": [ false, false, - false, false ] }, - "hash": "959ae6e1e8653e33c26ed4320cea93631841fe57e5be0a207bd24ae1dadd3bad" + "hash": "5df9d6889d5e057c3260d4956cdb313786b458082db232919de1a5e5195df7ee" } diff --git a/.sqlx/query-d4a496f6348550cd2c94c2235d82b01b29f864f64daf1bcb19162b1eb26df597.json b/.sqlx/query-d4a496f6348550cd2c94c2235d82b01b29f864f64daf1bcb19162b1eb26df597.json new file mode 100644 index 0000000..e5c0eae --- /dev/null +++ b/.sqlx/query-d4a496f6348550cd2c94c2235d82b01b29f864f64daf1bcb19162b1eb26df597.json @@ -0,0 +1,44 @@ +{ + "db_name": "SQLite", + "query": "\n select\n message.id as \"id: Id\",\n login.id as \"sender_id: LoginId\",\n login.name as sender_name,\n message.body,\n message.sent_at as \"sent_at: DateTime\"\n from message\n join login on message.sender = login.id\n where channel = $1\n order by sent_at asc\n ", + "describe": { + "columns": [ + { + "name": "id: Id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "sender_id: LoginId", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "sender_name", + "ordinal": 2, + "type_info": "Text" + }, + { + "name": "body", + "ordinal": 3, + "type_info": "Text" + }, + { + "name": "sent_at: DateTime", + "ordinal": 4, + "type_info": "Text" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false, + false, + false, + false, + false + ] + }, + "hash": "d4a496f6348550cd2c94c2235d82b01b29f864f64daf1bcb19162b1eb26df597" +} diff --git a/src/channel/app.rs b/src/channel/app.rs index c060b23..e242c2f 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -11,13 +11,9 @@ use tokio_stream::wrappers::BroadcastStream; use super::repo::{ channels::{Id as ChannelId, Provider as _}, - messages::{Id as MessageId, Message as StoredMessage, Provider as _}, -}; -use crate::{ - clock::DateTime, - error::BoxedError, - login::repo::logins::{Login, Provider as _}, + messages::{BroadcastMessage, Provider as _}, }; +use crate::{clock::DateTime, error::BoxedError, login::repo::logins::Login}; pub struct Channels<'a> { db: &'a SqlitePool, @@ -50,7 +46,6 @@ impl<'a> Channels<'a> { .messages() .create(&login.id, channel, body, sent_at) .await?; - let message = Message::from_login(login, message); tx.commit().await?; self.broadcaster.broadcast(channel, message); @@ -60,84 +55,14 @@ impl<'a> Channels<'a> { pub async fn events( &self, channel: &ChannelId, - ) -> Result<impl Stream<Item = Result<Message, BoxedError>>, BoxedError> { + ) -> Result<impl Stream<Item = Result<BroadcastMessage, BoxedError>>, BoxedError> { let live_messages = self.broadcaster.listen(channel).map_err(BoxedError::from); - let db = self.db.clone(); let mut tx = self.db.begin().await?; - let stored_messages = tx.messages().all(channel).await?; - let stored_messages = stream::iter(stored_messages).then(move |msg| { - // The exact series of moves and clones here is the result of trial - // and error, and is likely the best I can do, given: - // - // * This closure _can't_ keep a reference to self, for lifetime - // reasons; - // * The closure will be executed multiple times, so it can't give - // up `db`; and - // * The returned future can't keep a reference to `db` as doing - // so would allow refs to the closure's `db` to outlive the - // closure itself. - // - // Fortunately, cloning the pool is acceptable - sqlx pools were - // designed to be cloned and the only thing actually cloned is a - // single `Arc`. This whole chain of clones just ends up producing - // cheap handles to a single underlying "real" pool. - let db = db.clone(); - async move { - let mut tx = db.begin().await?; - let msg = Message::from_stored(&mut tx, msg).await?; - tx.commit().await?; - - Ok(msg) - } - }); + let stored_messages = tx.messages().for_replay(channel).await?; tx.commit().await?; - Ok(stored_messages.chain(live_messages)) - } -} - -#[derive(Clone, Debug, serde::Serialize)] -pub struct Message { - pub id: MessageId, - pub sender: Login, - pub body: String, - pub sent_at: DateTime, -} - -impl Message { - async fn from_stored( - tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>, - message: StoredMessage, - ) -> Result<Self, BoxedError> { - let sender = tx.logins().by_id(&message.sender).await?; - - let message = Self { - sender, - id: message.id, - body: message.body, - sent_at: message.sent_at, - }; - - Ok(message) - } - - fn from_login(sender: &Login, message: StoredMessage) -> Self { - // Panic as this logic is enforced by the caller anyways. This "can't - // happen," other than via programming mistakes, and cannot be fixed - // by config changes or changing user behaviours. - assert_eq!( - message.sender, sender.id, - "broadcast message must have the same sender ({}) as the stored message ({})", - sender.id, message.sender, - ); - - Self { - sender: sender.clone(), - id: message.id, - body: message.body, - sent_at: message.sent_at, - } + Ok(stream::iter(stored_messages).map(Ok).chain(live_messages)) } } @@ -147,7 +72,7 @@ pub struct Broadcaster { // The use of std::sync::Mutex, and not tokio::sync::Mutex, follows Tokio's // own advice: <https://tokio.rs/tokio/tutorial/shared-state>. Methods that // lock it must be sync. - senders: Arc<Mutex<HashMap<ChannelId, Sender<Message>>>>, + senders: Arc<Mutex<HashMap<ChannelId, Sender<BroadcastMessage>>>>, } impl Broadcaster { @@ -186,7 +111,7 @@ impl Broadcaster { // panic: if ``channel`` has not been previously registered, and was not // part of the initial set of channels. - pub fn broadcast(&self, channel: &ChannelId, message: Message) { + pub fn broadcast(&self, channel: &ChannelId, message: BroadcastMessage) { let tx = self.sender(channel); // Per the Tokio docs, the returned error is only used to indicate that @@ -198,7 +123,7 @@ impl Broadcaster { // panic: if ``channel`` has not been previously registered, and was not // part of the initial set of channels. - pub fn listen(&self, channel: &ChannelId) -> BroadcastStream<Message> { + pub fn listen(&self, channel: &ChannelId) -> BroadcastStream<BroadcastMessage> { let rx = self.sender(channel).subscribe(); BroadcastStream::from(rx) @@ -206,15 +131,15 @@ impl Broadcaster { // panic: if ``channel`` has not been previously registered, and was not // part of the initial set of channels. - fn sender(&self, channel: &ChannelId) -> Sender<Message> { + fn sender(&self, channel: &ChannelId) -> Sender<BroadcastMessage> { self.senders()[channel].clone() } - fn senders(&self) -> MutexGuard<HashMap<ChannelId, Sender<Message>>> { + fn senders(&self) -> MutexGuard<HashMap<ChannelId, Sender<BroadcastMessage>>> { self.senders.lock().unwrap() // propagate panics when mutex is poisoned } - fn make_sender() -> Sender<Message> { + fn make_sender() -> Sender<BroadcastMessage> { // Queue depth of 16 chosen entirely arbitrarily. Don't read too much // into it. let (tx, _) = channel(16); diff --git a/src/channel/repo/messages.rs b/src/channel/repo/messages.rs index 0d74ea9..fe833b6 100644 --- a/src/channel/repo/messages.rs +++ b/src/channel/repo/messages.rs @@ -4,7 +4,10 @@ use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; use super::channels::Id as ChannelId; use crate::{ - clock::DateTime, error::BoxedError, id::Id as BaseId, login::repo::logins::Id as LoginId, + clock::DateTime, + error::BoxedError, + id::Id as BaseId, + login::repo::logins::{Id as LoginId, Login, Logins}, }; pub trait Provider { @@ -19,13 +22,10 @@ impl<'c> Provider for Transaction<'c, Sqlite> { pub struct Messages<'t>(&'t mut SqliteConnection); -#[derive(Clone, Debug)] -pub struct Message { +#[derive(Clone, Debug, serde::Serialize)] +pub struct BroadcastMessage { pub id: Id, - pub sender: LoginId, - // Field not actually used at this time, but you can reinstate it if you - // need to. It's not omitted out of any greater design intention. - // pub channel: ChannelId, + pub sender: Login, pub body: String, pub sent_at: DateTime, } @@ -37,50 +37,67 @@ impl<'c> Messages<'c> { channel: &ChannelId, body: &str, sent_at: &DateTime, - ) -> Result<Message, BoxedError> { + ) -> Result<BroadcastMessage, BoxedError> { let id = Id::generate(); - let message = sqlx::query_as!( - Message, + let sender = Logins::from(&mut *self.0).by_id(sender).await?; + + let message = sqlx::query!( r#" insert into message (id, sender, channel, body, sent_at) values ($1, $2, $3, $4, $5) returning id as "id: Id", - sender as "sender: LoginId", - -- channel as "channel: ChannelId", body, sent_at as "sent_at: DateTime" "#, id, - sender, + sender.id, channel, body, sent_at, ) + .map(|row| BroadcastMessage { + sender: sender.clone(), + id: row.id, + body: row.body, + sent_at: row.sent_at, + }) .fetch_one(&mut *self.0) .await?; Ok(message) } - pub async fn all(&mut self, channel: &ChannelId) -> Result<Vec<Message>, BoxedError> { - let messages = sqlx::query_as!( - Message, + pub async fn for_replay( + &mut self, + channel: &ChannelId, + ) -> Result<Vec<BroadcastMessage>, BoxedError> { + let messages = sqlx::query!( r#" select - id as "id: Id", - sender as "sender: LoginId", - -- channel as "channel: ChannelId", - body, - sent_at as "sent_at: DateTime" + message.id as "id: Id", + login.id as "sender_id: LoginId", + login.name as sender_name, + message.body, + message.sent_at as "sent_at: DateTime" from message + join login on message.sender = login.id where channel = $1 order by sent_at asc "#, channel, ) + .map(|row| BroadcastMessage { + id: row.id, + sender: Login { + id: row.sender_id, + name: row.sender_name, + }, + body: row.body, + sent_at: row.sent_at, + }) .fetch_all(&mut *self.0) .await?; diff --git a/src/login/repo/logins.rs b/src/login/repo/logins.rs index e1c5057..b0c8ce4 100644 --- a/src/login/repo/logins.rs +++ b/src/login/repo/logins.rs @@ -107,6 +107,12 @@ impl<'c> Logins<'c> { } } +impl<'t> From<&'t mut SqliteConnection> for Logins<'t> { + fn from(tx: &'t mut SqliteConnection) -> Self { + Self(tx) + } +} + /// Stable identifier for a [Login]. Prefixed with `L`. #[derive(Clone, Debug, Eq, PartialEq, sqlx::Type, serde::Serialize)] #[sqlx(transparent)] |
