summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorOwen Jacobson <owen@grimoire.ca>2024-09-13 01:26:56 -0400
committerOwen Jacobson <owen@grimoire.ca>2024-09-13 02:42:27 -0400
commit3193a30ebcf6bafdeaf463eda0e7e82082dfe4b5 (patch)
tree2fb3ea84923aecf0ec1f820408bdc670b1247c95
parent067e3da1900d052a416c56e1c047640aa23441ae (diff)
Embed the sender's whole login (id and name) in messages, drop the redundant channel ID.
-rw-r--r--.sqlx/query-43c6f8d2fba6620d5ef64ca26494cfd46365670959cd33e1fd52bca46b0fdc20.json (renamed from .sqlx/query-2fc4e7c9085fbd3c42a0d19e775a7f9dbeabf8094a1e781803f34a128af29075.json)14
-rw-r--r--.sqlx/query-8e407aa645ceb5a1010aaa469d57a5956342b185d001c94179d4172936554829.json26
-rw-r--r--.sqlx/query-959ae6e1e8653e33c26ed4320cea93631841fe57e5be0a207bd24ae1dadd3bad.json (renamed from .sqlx/query-9d7f46ceefa71aad1aec42205a7b6d6aecbc52c56a7a8ad38f96cb82ebcbd53e.json)14
-rw-r--r--src/channel/app.rs89
-rw-r--r--src/channel/repo/messages.rs10
-rw-r--r--src/channel/routes.rs12
-rw-r--r--src/login/repo/logins.rs28
7 files changed, 157 insertions, 36 deletions
diff --git a/.sqlx/query-2fc4e7c9085fbd3c42a0d19e775a7f9dbeabf8094a1e781803f34a128af29075.json b/.sqlx/query-43c6f8d2fba6620d5ef64ca26494cfd46365670959cd33e1fd52bca46b0fdc20.json
index f7e5590..6368a4a 100644
--- a/.sqlx/query-2fc4e7c9085fbd3c42a0d19e775a7f9dbeabf8094a1e781803f34a128af29075.json
+++ b/.sqlx/query-43c6f8d2fba6620d5ef64ca26494cfd46365670959cd33e1fd52bca46b0fdc20.json
@@ -1,6 +1,6 @@
{
"db_name": "SQLite",
- "query": "\n select\n id as \"id: Id\",\n sender as \"sender: LoginId\",\n channel as \"channel: ChannelId\",\n body,\n sent_at as \"sent_at: DateTime\"\n from message\n where channel = $1\n order by sent_at asc\n ",
+ "query": "\n select\n id as \"id: Id\",\n sender as \"sender: LoginId\",\n -- channel as \"channel: ChannelId\",\n body,\n sent_at as \"sent_at: DateTime\"\n from message\n where channel = $1\n order by sent_at asc\n ",
"describe": {
"columns": [
{
@@ -14,18 +14,13 @@
"type_info": "Text"
},
{
- "name": "channel: ChannelId",
- "ordinal": 2,
- "type_info": "Text"
- },
- {
"name": "body",
- "ordinal": 3,
+ "ordinal": 2,
"type_info": "Text"
},
{
"name": "sent_at: DateTime",
- "ordinal": 4,
+ "ordinal": 3,
"type_info": "Text"
}
],
@@ -36,9 +31,8 @@
false,
false,
false,
- false,
false
]
},
- "hash": "2fc4e7c9085fbd3c42a0d19e775a7f9dbeabf8094a1e781803f34a128af29075"
+ "hash": "43c6f8d2fba6620d5ef64ca26494cfd46365670959cd33e1fd52bca46b0fdc20"
}
diff --git a/.sqlx/query-8e407aa645ceb5a1010aaa469d57a5956342b185d001c94179d4172936554829.json b/.sqlx/query-8e407aa645ceb5a1010aaa469d57a5956342b185d001c94179d4172936554829.json
new file mode 100644
index 0000000..aa35e54
--- /dev/null
+++ b/.sqlx/query-8e407aa645ceb5a1010aaa469d57a5956342b185d001c94179d4172936554829.json
@@ -0,0 +1,26 @@
+{
+ "db_name": "SQLite",
+ "query": "\n select\n id as \"id: Id\",\n name\n from login\n where id = $1\n ",
+ "describe": {
+ "columns": [
+ {
+ "name": "id: Id",
+ "ordinal": 0,
+ "type_info": "Text"
+ },
+ {
+ "name": "name",
+ "ordinal": 1,
+ "type_info": "Text"
+ }
+ ],
+ "parameters": {
+ "Right": 1
+ },
+ "nullable": [
+ false,
+ false
+ ]
+ },
+ "hash": "8e407aa645ceb5a1010aaa469d57a5956342b185d001c94179d4172936554829"
+}
diff --git a/.sqlx/query-9d7f46ceefa71aad1aec42205a7b6d6aecbc52c56a7a8ad38f96cb82ebcbd53e.json b/.sqlx/query-959ae6e1e8653e33c26ed4320cea93631841fe57e5be0a207bd24ae1dadd3bad.json
index 93bbe5e..06c5e53 100644
--- a/.sqlx/query-9d7f46ceefa71aad1aec42205a7b6d6aecbc52c56a7a8ad38f96cb82ebcbd53e.json
+++ b/.sqlx/query-959ae6e1e8653e33c26ed4320cea93631841fe57e5be0a207bd24ae1dadd3bad.json
@@ -1,6 +1,6 @@
{
"db_name": "SQLite",
- "query": "\n insert into message\n (id, sender, channel, body, sent_at)\n values ($1, $2, $3, $4, $5)\n returning\n id as \"id: Id\",\n sender as \"sender: LoginId\",\n channel as \"channel: ChannelId\",\n body,\n sent_at as \"sent_at: DateTime\"\n ",
+ "query": "\n insert into message\n (id, sender, channel, body, sent_at)\n values ($1, $2, $3, $4, $5)\n returning\n id as \"id: Id\",\n sender as \"sender: LoginId\",\n -- channel as \"channel: ChannelId\",\n body,\n sent_at as \"sent_at: DateTime\"\n ",
"describe": {
"columns": [
{
@@ -14,18 +14,13 @@
"type_info": "Text"
},
{
- "name": "channel: ChannelId",
- "ordinal": 2,
- "type_info": "Text"
- },
- {
"name": "body",
- "ordinal": 3,
+ "ordinal": 2,
"type_info": "Text"
},
{
"name": "sent_at: DateTime",
- "ordinal": 4,
+ "ordinal": 3,
"type_info": "Text"
}
],
@@ -36,9 +31,8 @@
false,
false,
false,
- false,
false
]
},
- "hash": "9d7f46ceefa71aad1aec42205a7b6d6aecbc52c56a7a8ad38f96cb82ebcbd53e"
+ "hash": "959ae6e1e8653e33c26ed4320cea93631841fe57e5be0a207bd24ae1dadd3bad"
}
diff --git a/src/channel/app.rs b/src/channel/app.rs
index 7b02300..4aa2622 100644
--- a/src/channel/app.rs
+++ b/src/channel/app.rs
@@ -11,9 +11,13 @@ use tokio_stream::wrappers::BroadcastStream;
use super::repo::{
channels::{Id as ChannelId, Provider as _},
- messages::{Message, Provider as _},
+ messages::{Id as MessageId, Message as StoredMessage, Provider as _},
+};
+use crate::{
+ clock::DateTime,
+ error::BoxedError,
+ login::repo::logins::{Id as LoginId, Login, Provider as _},
};
-use crate::{clock::DateTime, error::BoxedError, login::repo::logins::Login};
pub struct Channels<'a> {
db: &'a SqlitePool,
@@ -46,6 +50,7 @@ impl<'a> Channels<'a> {
.messages()
.create(&login.id, channel, body, sent_at)
.await?;
+ let message = Message::from_login(login, message)?;
tx.commit().await?;
self.broadcaster.broadcast(channel, message)?;
@@ -58,16 +63,92 @@ impl<'a> Channels<'a> {
) -> Result<impl Stream<Item = Result<Message, BoxedError>>, BoxedError> {
let live_messages = self.broadcaster.listen(channel)?.map_err(BoxedError::from);
+ let db = self.db.clone();
let mut tx = self.db.begin().await?;
let stored_messages = tx.messages().all(channel).await?;
+ let stored_messages = stream::iter(stored_messages).then(move |msg| {
+ // The exact series of moves and clones here is the result of trial
+ // and error, and is likely the best I can do, given:
+ //
+ // * This closure _can't_ keep a reference to self, for lifetime
+ // reasons;
+ // * The closure will be executed multiple times, so it can't give
+ // up `db`; and
+ // * The returned future can't keep a reference to `db` as doing
+ // so would allow refs to the closure's `db` to outlive the
+ // closure itself.
+ //
+ // Fortunately, cloning the pool is acceptable - sqlx pools were
+ // designed to be cloned and the only thing actually cloned is a
+ // single `Arc`. This whole chain of clones just ends up producing
+ // cheap handles to a single underlying "real" pool.
+ let db = db.clone();
+ async move {
+ let mut tx = db.begin().await?;
+ let msg = Message::from_stored(&mut tx, msg).await?;
+ tx.commit().await?;
+
+ Ok(msg)
+ }
+ });
tx.commit().await?;
- let stored_messages = stream::iter(stored_messages.into_iter().map(Ok));
-
Ok(stored_messages.chain(live_messages))
}
}
+#[derive(Clone, Debug, serde::Serialize)]
+pub struct Message {
+ pub id: MessageId,
+ pub sender: Login,
+ pub body: String,
+ pub sent_at: DateTime,
+}
+
+impl Message {
+ async fn from_stored(
+ tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>,
+ message: StoredMessage,
+ ) -> Result<Self, BoxedError> {
+ let sender = tx.logins().by_id(&message.sender).await?;
+
+ let message = Self {
+ sender,
+ id: message.id,
+ body: message.body,
+ sent_at: message.sent_at,
+ };
+
+ Ok(message)
+ }
+
+ fn from_login(sender: &Login, message: StoredMessage) -> Result<Self, MessageError> {
+ if sender.id != message.sender {
+ // This functionally can't happen, but the funny thing about "This
+ // can never happen" comments is that they're usually wrong.
+ return Err(MessageError::LoginMismatched {
+ sender: sender.id.clone(),
+ message: message.sender,
+ });
+ }
+
+ let message = Self {
+ sender: sender.clone(),
+ id: message.id,
+ body: message.body,
+ sent_at: message.sent_at,
+ };
+
+ Ok(message)
+ }
+}
+
+#[derive(Debug, thiserror::Error)]
+enum MessageError {
+ #[error("sender login id {sender} did not match message login id {message}")]
+ LoginMismatched { sender: LoginId, message: LoginId },
+}
+
// Clones will share the same senders collection.
#[derive(Clone)]
pub struct Broadcaster {
diff --git a/src/channel/repo/messages.rs b/src/channel/repo/messages.rs
index bdb0d29..0d74ea9 100644
--- a/src/channel/repo/messages.rs
+++ b/src/channel/repo/messages.rs
@@ -19,11 +19,13 @@ impl<'c> Provider for Transaction<'c, Sqlite> {
pub struct Messages<'t>(&'t mut SqliteConnection);
-#[derive(Clone, Debug, serde::Serialize)]
+#[derive(Clone, Debug)]
pub struct Message {
pub id: Id,
pub sender: LoginId,
- pub channel: ChannelId,
+ // Field not actually used at this time, but you can reinstate it if you
+ // need to. It's not omitted out of any greater design intention.
+ // pub channel: ChannelId,
pub body: String,
pub sent_at: DateTime,
}
@@ -47,7 +49,7 @@ impl<'c> Messages<'c> {
returning
id as "id: Id",
sender as "sender: LoginId",
- channel as "channel: ChannelId",
+ -- channel as "channel: ChannelId",
body,
sent_at as "sent_at: DateTime"
"#,
@@ -70,7 +72,7 @@ impl<'c> Messages<'c> {
select
id as "id: Id",
sender as "sender: LoginId",
- channel as "channel: ChannelId",
+ -- channel as "channel: ChannelId",
body,
sent_at as "sent_at: DateTime"
from message
diff --git a/src/channel/routes.rs b/src/channel/routes.rs
index 83c733c..0f95c69 100644
--- a/src/channel/routes.rs
+++ b/src/channel/routes.rs
@@ -8,10 +8,13 @@ use axum::{
routing::{get, post},
Router,
};
-use futures::stream::{StreamExt as _, TryStreamExt as _};
+use futures::{future, stream::TryStreamExt as _};
use super::repo::channels::Id as ChannelId;
-use crate::{app::App, clock::RequestedAt, error::InternalError, login::repo::logins::Login};
+use crate::{
+ app::App, clock::RequestedAt, error::BoxedError, error::InternalError,
+ login::repo::logins::Login,
+};
pub fn router() -> Router<App> {
Router::new()
@@ -63,10 +66,7 @@ async fn on_events(
.channels()
.events(&channel)
.await?
- .map(|msg| match msg {
- Ok(msg) => Ok(serde_json::to_string(&msg)?),
- Err(err) => Err(err),
- })
+ .and_then(|msg| future::ready(serde_json::to_string(&msg).map_err(BoxedError::from)))
.map_ok(|msg| sse::Event::default().data(&msg));
Ok(Sse::new(stream).keep_alive(sse::KeepAlive::default()))
diff --git a/src/login/repo/logins.rs b/src/login/repo/logins.rs
index 142d8fb..e1c5057 100644
--- a/src/login/repo/logins.rs
+++ b/src/login/repo/logins.rs
@@ -18,7 +18,7 @@ pub struct Logins<'t>(&'t mut SqliteConnection);
// This also implements FromRequestParts (see `src/login/extract/login.rs`). As
// a result, it can be used as an extractor.
-#[derive(Debug)]
+#[derive(Clone, Debug, serde::Serialize)]
pub struct Login {
pub id: Id,
pub name: String,
@@ -55,6 +55,24 @@ impl<'c> Logins<'c> {
Ok(login)
}
+ pub async fn by_id(&mut self, id: &Id) -> Result<Login, BoxedError> {
+ let login = sqlx::query_as!(
+ Login,
+ r#"
+ select
+ id as "id: Id",
+ name
+ from login
+ where id = $1
+ "#,
+ id,
+ )
+ .fetch_one(&mut *self.0)
+ .await?;
+
+ Ok(login)
+ }
+
/// Retrieves a login by name, plus its stored password hash for
/// verification. If there's no login with the requested name, this will
/// return [None].
@@ -90,7 +108,7 @@ impl<'c> Logins<'c> {
}
/// Stable identifier for a [Login]. Prefixed with `L`.
-#[derive(Clone, Debug, sqlx::Type, serde::Serialize)]
+#[derive(Clone, Debug, Eq, PartialEq, sqlx::Type, serde::Serialize)]
#[sqlx(transparent)]
pub struct Id(BaseId);
@@ -105,3 +123,9 @@ impl Id {
BaseId::generate("L")
}
}
+
+impl std::fmt::Display for Id {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ self.0.fmt(f)
+ }
+}