summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorOwen Jacobson <owen@grimoire.ca>2024-09-13 21:19:40 -0400
committerOwen Jacobson <owen@grimoire.ca>2024-09-13 23:12:31 -0400
commit388a3d5a925aef7ff39339454ae0d720e05f038e (patch)
tree672595cd179eeca7c9e61606704e453202f9efb0
parent72daa3f510ea381b7e3606c75f03ce6000dc780c (diff)
Generate the required structure for broadcasting from a join, not from O(n) queries.
-rw-r--r--.sqlx/query-43c6f8d2fba6620d5ef64ca26494cfd46365670959cd33e1fd52bca46b0fdc20.json38
-rw-r--r--.sqlx/query-5df9d6889d5e057c3260d4956cdb313786b458082db232919de1a5e5195df7ee.json (renamed from .sqlx/query-959ae6e1e8653e33c26ed4320cea93631841fe57e5be0a207bd24ae1dadd3bad.json)14
-rw-r--r--.sqlx/query-d4a496f6348550cd2c94c2235d82b01b29f864f64daf1bcb19162b1eb26df597.json44
-rw-r--r--src/channel/app.rs97
-rw-r--r--src/channel/repo/messages.rs59
-rw-r--r--src/login/repo/logins.rs6
6 files changed, 103 insertions, 155 deletions
diff --git a/.sqlx/query-43c6f8d2fba6620d5ef64ca26494cfd46365670959cd33e1fd52bca46b0fdc20.json b/.sqlx/query-43c6f8d2fba6620d5ef64ca26494cfd46365670959cd33e1fd52bca46b0fdc20.json
deleted file mode 100644
index 6368a4a..0000000
--- a/.sqlx/query-43c6f8d2fba6620d5ef64ca26494cfd46365670959cd33e1fd52bca46b0fdc20.json
+++ /dev/null
@@ -1,38 +0,0 @@
-{
- "db_name": "SQLite",
- "query": "\n select\n id as \"id: Id\",\n sender as \"sender: LoginId\",\n -- channel as \"channel: ChannelId\",\n body,\n sent_at as \"sent_at: DateTime\"\n from message\n where channel = $1\n order by sent_at asc\n ",
- "describe": {
- "columns": [
- {
- "name": "id: Id",
- "ordinal": 0,
- "type_info": "Text"
- },
- {
- "name": "sender: LoginId",
- "ordinal": 1,
- "type_info": "Text"
- },
- {
- "name": "body",
- "ordinal": 2,
- "type_info": "Text"
- },
- {
- "name": "sent_at: DateTime",
- "ordinal": 3,
- "type_info": "Text"
- }
- ],
- "parameters": {
- "Right": 1
- },
- "nullable": [
- false,
- false,
- false,
- false
- ]
- },
- "hash": "43c6f8d2fba6620d5ef64ca26494cfd46365670959cd33e1fd52bca46b0fdc20"
-}
diff --git a/.sqlx/query-959ae6e1e8653e33c26ed4320cea93631841fe57e5be0a207bd24ae1dadd3bad.json b/.sqlx/query-5df9d6889d5e057c3260d4956cdb313786b458082db232919de1a5e5195df7ee.json
index 06c5e53..7fd7ae3 100644
--- a/.sqlx/query-959ae6e1e8653e33c26ed4320cea93631841fe57e5be0a207bd24ae1dadd3bad.json
+++ b/.sqlx/query-5df9d6889d5e057c3260d4956cdb313786b458082db232919de1a5e5195df7ee.json
@@ -1,6 +1,6 @@
{
"db_name": "SQLite",
- "query": "\n insert into message\n (id, sender, channel, body, sent_at)\n values ($1, $2, $3, $4, $5)\n returning\n id as \"id: Id\",\n sender as \"sender: LoginId\",\n -- channel as \"channel: ChannelId\",\n body,\n sent_at as \"sent_at: DateTime\"\n ",
+ "query": "\n insert into message\n (id, sender, channel, body, sent_at)\n values ($1, $2, $3, $4, $5)\n returning\n id as \"id: Id\",\n body,\n sent_at as \"sent_at: DateTime\"\n ",
"describe": {
"columns": [
{
@@ -9,18 +9,13 @@
"type_info": "Text"
},
{
- "name": "sender: LoginId",
- "ordinal": 1,
- "type_info": "Text"
- },
- {
"name": "body",
- "ordinal": 2,
+ "ordinal": 1,
"type_info": "Text"
},
{
"name": "sent_at: DateTime",
- "ordinal": 3,
+ "ordinal": 2,
"type_info": "Text"
}
],
@@ -30,9 +25,8 @@
"nullable": [
false,
false,
- false,
false
]
},
- "hash": "959ae6e1e8653e33c26ed4320cea93631841fe57e5be0a207bd24ae1dadd3bad"
+ "hash": "5df9d6889d5e057c3260d4956cdb313786b458082db232919de1a5e5195df7ee"
}
diff --git a/.sqlx/query-d4a496f6348550cd2c94c2235d82b01b29f864f64daf1bcb19162b1eb26df597.json b/.sqlx/query-d4a496f6348550cd2c94c2235d82b01b29f864f64daf1bcb19162b1eb26df597.json
new file mode 100644
index 0000000..e5c0eae
--- /dev/null
+++ b/.sqlx/query-d4a496f6348550cd2c94c2235d82b01b29f864f64daf1bcb19162b1eb26df597.json
@@ -0,0 +1,44 @@
+{
+ "db_name": "SQLite",
+ "query": "\n select\n message.id as \"id: Id\",\n login.id as \"sender_id: LoginId\",\n login.name as sender_name,\n message.body,\n message.sent_at as \"sent_at: DateTime\"\n from message\n join login on message.sender = login.id\n where channel = $1\n order by sent_at asc\n ",
+ "describe": {
+ "columns": [
+ {
+ "name": "id: Id",
+ "ordinal": 0,
+ "type_info": "Text"
+ },
+ {
+ "name": "sender_id: LoginId",
+ "ordinal": 1,
+ "type_info": "Text"
+ },
+ {
+ "name": "sender_name",
+ "ordinal": 2,
+ "type_info": "Text"
+ },
+ {
+ "name": "body",
+ "ordinal": 3,
+ "type_info": "Text"
+ },
+ {
+ "name": "sent_at: DateTime",
+ "ordinal": 4,
+ "type_info": "Text"
+ }
+ ],
+ "parameters": {
+ "Right": 1
+ },
+ "nullable": [
+ false,
+ false,
+ false,
+ false,
+ false
+ ]
+ },
+ "hash": "d4a496f6348550cd2c94c2235d82b01b29f864f64daf1bcb19162b1eb26df597"
+}
diff --git a/src/channel/app.rs b/src/channel/app.rs
index c060b23..e242c2f 100644
--- a/src/channel/app.rs
+++ b/src/channel/app.rs
@@ -11,13 +11,9 @@ use tokio_stream::wrappers::BroadcastStream;
use super::repo::{
channels::{Id as ChannelId, Provider as _},
- messages::{Id as MessageId, Message as StoredMessage, Provider as _},
-};
-use crate::{
- clock::DateTime,
- error::BoxedError,
- login::repo::logins::{Login, Provider as _},
+ messages::{BroadcastMessage, Provider as _},
};
+use crate::{clock::DateTime, error::BoxedError, login::repo::logins::Login};
pub struct Channels<'a> {
db: &'a SqlitePool,
@@ -50,7 +46,6 @@ impl<'a> Channels<'a> {
.messages()
.create(&login.id, channel, body, sent_at)
.await?;
- let message = Message::from_login(login, message);
tx.commit().await?;
self.broadcaster.broadcast(channel, message);
@@ -60,84 +55,14 @@ impl<'a> Channels<'a> {
pub async fn events(
&self,
channel: &ChannelId,
- ) -> Result<impl Stream<Item = Result<Message, BoxedError>>, BoxedError> {
+ ) -> Result<impl Stream<Item = Result<BroadcastMessage, BoxedError>>, BoxedError> {
let live_messages = self.broadcaster.listen(channel).map_err(BoxedError::from);
- let db = self.db.clone();
let mut tx = self.db.begin().await?;
- let stored_messages = tx.messages().all(channel).await?;
- let stored_messages = stream::iter(stored_messages).then(move |msg| {
- // The exact series of moves and clones here is the result of trial
- // and error, and is likely the best I can do, given:
- //
- // * This closure _can't_ keep a reference to self, for lifetime
- // reasons;
- // * The closure will be executed multiple times, so it can't give
- // up `db`; and
- // * The returned future can't keep a reference to `db` as doing
- // so would allow refs to the closure's `db` to outlive the
- // closure itself.
- //
- // Fortunately, cloning the pool is acceptable - sqlx pools were
- // designed to be cloned and the only thing actually cloned is a
- // single `Arc`. This whole chain of clones just ends up producing
- // cheap handles to a single underlying "real" pool.
- let db = db.clone();
- async move {
- let mut tx = db.begin().await?;
- let msg = Message::from_stored(&mut tx, msg).await?;
- tx.commit().await?;
-
- Ok(msg)
- }
- });
+ let stored_messages = tx.messages().for_replay(channel).await?;
tx.commit().await?;
- Ok(stored_messages.chain(live_messages))
- }
-}
-
-#[derive(Clone, Debug, serde::Serialize)]
-pub struct Message {
- pub id: MessageId,
- pub sender: Login,
- pub body: String,
- pub sent_at: DateTime,
-}
-
-impl Message {
- async fn from_stored(
- tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>,
- message: StoredMessage,
- ) -> Result<Self, BoxedError> {
- let sender = tx.logins().by_id(&message.sender).await?;
-
- let message = Self {
- sender,
- id: message.id,
- body: message.body,
- sent_at: message.sent_at,
- };
-
- Ok(message)
- }
-
- fn from_login(sender: &Login, message: StoredMessage) -> Self {
- // Panic as this logic is enforced by the caller anyways. This "can't
- // happen," other than via programming mistakes, and cannot be fixed
- // by config changes or changing user behaviours.
- assert_eq!(
- message.sender, sender.id,
- "broadcast message must have the same sender ({}) as the stored message ({})",
- sender.id, message.sender,
- );
-
- Self {
- sender: sender.clone(),
- id: message.id,
- body: message.body,
- sent_at: message.sent_at,
- }
+ Ok(stream::iter(stored_messages).map(Ok).chain(live_messages))
}
}
@@ -147,7 +72,7 @@ pub struct Broadcaster {
// The use of std::sync::Mutex, and not tokio::sync::Mutex, follows Tokio's
// own advice: <https://tokio.rs/tokio/tutorial/shared-state>. Methods that
// lock it must be sync.
- senders: Arc<Mutex<HashMap<ChannelId, Sender<Message>>>>,
+ senders: Arc<Mutex<HashMap<ChannelId, Sender<BroadcastMessage>>>>,
}
impl Broadcaster {
@@ -186,7 +111,7 @@ impl Broadcaster {
// panic: if ``channel`` has not been previously registered, and was not
// part of the initial set of channels.
- pub fn broadcast(&self, channel: &ChannelId, message: Message) {
+ pub fn broadcast(&self, channel: &ChannelId, message: BroadcastMessage) {
let tx = self.sender(channel);
// Per the Tokio docs, the returned error is only used to indicate that
@@ -198,7 +123,7 @@ impl Broadcaster {
// panic: if ``channel`` has not been previously registered, and was not
// part of the initial set of channels.
- pub fn listen(&self, channel: &ChannelId) -> BroadcastStream<Message> {
+ pub fn listen(&self, channel: &ChannelId) -> BroadcastStream<BroadcastMessage> {
let rx = self.sender(channel).subscribe();
BroadcastStream::from(rx)
@@ -206,15 +131,15 @@ impl Broadcaster {
// panic: if ``channel`` has not been previously registered, and was not
// part of the initial set of channels.
- fn sender(&self, channel: &ChannelId) -> Sender<Message> {
+ fn sender(&self, channel: &ChannelId) -> Sender<BroadcastMessage> {
self.senders()[channel].clone()
}
- fn senders(&self) -> MutexGuard<HashMap<ChannelId, Sender<Message>>> {
+ fn senders(&self) -> MutexGuard<HashMap<ChannelId, Sender<BroadcastMessage>>> {
self.senders.lock().unwrap() // propagate panics when mutex is poisoned
}
- fn make_sender() -> Sender<Message> {
+ fn make_sender() -> Sender<BroadcastMessage> {
// Queue depth of 16 chosen entirely arbitrarily. Don't read too much
// into it.
let (tx, _) = channel(16);
diff --git a/src/channel/repo/messages.rs b/src/channel/repo/messages.rs
index 0d74ea9..fe833b6 100644
--- a/src/channel/repo/messages.rs
+++ b/src/channel/repo/messages.rs
@@ -4,7 +4,10 @@ use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction};
use super::channels::Id as ChannelId;
use crate::{
- clock::DateTime, error::BoxedError, id::Id as BaseId, login::repo::logins::Id as LoginId,
+ clock::DateTime,
+ error::BoxedError,
+ id::Id as BaseId,
+ login::repo::logins::{Id as LoginId, Login, Logins},
};
pub trait Provider {
@@ -19,13 +22,10 @@ impl<'c> Provider for Transaction<'c, Sqlite> {
pub struct Messages<'t>(&'t mut SqliteConnection);
-#[derive(Clone, Debug)]
-pub struct Message {
+#[derive(Clone, Debug, serde::Serialize)]
+pub struct BroadcastMessage {
pub id: Id,
- pub sender: LoginId,
- // Field not actually used at this time, but you can reinstate it if you
- // need to. It's not omitted out of any greater design intention.
- // pub channel: ChannelId,
+ pub sender: Login,
pub body: String,
pub sent_at: DateTime,
}
@@ -37,50 +37,67 @@ impl<'c> Messages<'c> {
channel: &ChannelId,
body: &str,
sent_at: &DateTime,
- ) -> Result<Message, BoxedError> {
+ ) -> Result<BroadcastMessage, BoxedError> {
let id = Id::generate();
- let message = sqlx::query_as!(
- Message,
+ let sender = Logins::from(&mut *self.0).by_id(sender).await?;
+
+ let message = sqlx::query!(
r#"
insert into message
(id, sender, channel, body, sent_at)
values ($1, $2, $3, $4, $5)
returning
id as "id: Id",
- sender as "sender: LoginId",
- -- channel as "channel: ChannelId",
body,
sent_at as "sent_at: DateTime"
"#,
id,
- sender,
+ sender.id,
channel,
body,
sent_at,
)
+ .map(|row| BroadcastMessage {
+ sender: sender.clone(),
+ id: row.id,
+ body: row.body,
+ sent_at: row.sent_at,
+ })
.fetch_one(&mut *self.0)
.await?;
Ok(message)
}
- pub async fn all(&mut self, channel: &ChannelId) -> Result<Vec<Message>, BoxedError> {
- let messages = sqlx::query_as!(
- Message,
+ pub async fn for_replay(
+ &mut self,
+ channel: &ChannelId,
+ ) -> Result<Vec<BroadcastMessage>, BoxedError> {
+ let messages = sqlx::query!(
r#"
select
- id as "id: Id",
- sender as "sender: LoginId",
- -- channel as "channel: ChannelId",
- body,
- sent_at as "sent_at: DateTime"
+ message.id as "id: Id",
+ login.id as "sender_id: LoginId",
+ login.name as sender_name,
+ message.body,
+ message.sent_at as "sent_at: DateTime"
from message
+ join login on message.sender = login.id
where channel = $1
order by sent_at asc
"#,
channel,
)
+ .map(|row| BroadcastMessage {
+ id: row.id,
+ sender: Login {
+ id: row.sender_id,
+ name: row.sender_name,
+ },
+ body: row.body,
+ sent_at: row.sent_at,
+ })
.fetch_all(&mut *self.0)
.await?;
diff --git a/src/login/repo/logins.rs b/src/login/repo/logins.rs
index e1c5057..b0c8ce4 100644
--- a/src/login/repo/logins.rs
+++ b/src/login/repo/logins.rs
@@ -107,6 +107,12 @@ impl<'c> Logins<'c> {
}
}
+impl<'t> From<&'t mut SqliteConnection> for Logins<'t> {
+ fn from(tx: &'t mut SqliteConnection) -> Self {
+ Self(tx)
+ }
+}
+
/// Stable identifier for a [Login]. Prefixed with `L`.
#[derive(Clone, Debug, Eq, PartialEq, sqlx::Type, serde::Serialize)]
#[sqlx(transparent)]