summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/channel/app.rs97
-rw-r--r--src/channel/repo/messages.rs59
-rw-r--r--src/login/repo/logins.rs6
3 files changed, 55 insertions, 107 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs
index c060b23..e242c2f 100644
--- a/src/channel/app.rs
+++ b/src/channel/app.rs
@@ -11,13 +11,9 @@ use tokio_stream::wrappers::BroadcastStream;
use super::repo::{
channels::{Id as ChannelId, Provider as _},
- messages::{Id as MessageId, Message as StoredMessage, Provider as _},
-};
-use crate::{
- clock::DateTime,
- error::BoxedError,
- login::repo::logins::{Login, Provider as _},
+ messages::{BroadcastMessage, Provider as _},
};
+use crate::{clock::DateTime, error::BoxedError, login::repo::logins::Login};
pub struct Channels<'a> {
db: &'a SqlitePool,
@@ -50,7 +46,6 @@ impl<'a> Channels<'a> {
.messages()
.create(&login.id, channel, body, sent_at)
.await?;
- let message = Message::from_login(login, message);
tx.commit().await?;
self.broadcaster.broadcast(channel, message);
@@ -60,84 +55,14 @@ impl<'a> Channels<'a> {
pub async fn events(
&self,
channel: &ChannelId,
- ) -> Result<impl Stream<Item = Result<Message, BoxedError>>, BoxedError> {
+ ) -> Result<impl Stream<Item = Result<BroadcastMessage, BoxedError>>, BoxedError> {
let live_messages = self.broadcaster.listen(channel).map_err(BoxedError::from);
- let db = self.db.clone();
let mut tx = self.db.begin().await?;
- let stored_messages = tx.messages().all(channel).await?;
- let stored_messages = stream::iter(stored_messages).then(move |msg| {
- // The exact series of moves and clones here is the result of trial
- // and error, and is likely the best I can do, given:
- //
- // * This closure _can't_ keep a reference to self, for lifetime
- // reasons;
- // * The closure will be executed multiple times, so it can't give
- // up `db`; and
- // * The returned future can't keep a reference to `db` as doing
- // so would allow refs to the closure's `db` to outlive the
- // closure itself.
- //
- // Fortunately, cloning the pool is acceptable - sqlx pools were
- // designed to be cloned and the only thing actually cloned is a
- // single `Arc`. This whole chain of clones just ends up producing
- // cheap handles to a single underlying "real" pool.
- let db = db.clone();
- async move {
- let mut tx = db.begin().await?;
- let msg = Message::from_stored(&mut tx, msg).await?;
- tx.commit().await?;
-
- Ok(msg)
- }
- });
+ let stored_messages = tx.messages().for_replay(channel).await?;
tx.commit().await?;
- Ok(stored_messages.chain(live_messages))
- }
-}
-
-#[derive(Clone, Debug, serde::Serialize)]
-pub struct Message {
- pub id: MessageId,
- pub sender: Login,
- pub body: String,
- pub sent_at: DateTime,
-}
-
-impl Message {
- async fn from_stored(
- tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>,
- message: StoredMessage,
- ) -> Result<Self, BoxedError> {
- let sender = tx.logins().by_id(&message.sender).await?;
-
- let message = Self {
- sender,
- id: message.id,
- body: message.body,
- sent_at: message.sent_at,
- };
-
- Ok(message)
- }
-
- fn from_login(sender: &Login, message: StoredMessage) -> Self {
- // Panic as this logic is enforced by the caller anyways. This "can't
- // happen," other than via programming mistakes, and cannot be fixed
- // by config changes or changing user behaviours.
- assert_eq!(
- message.sender, sender.id,
- "broadcast message must have the same sender ({}) as the stored message ({})",
- sender.id, message.sender,
- );
-
- Self {
- sender: sender.clone(),
- id: message.id,
- body: message.body,
- sent_at: message.sent_at,
- }
+ Ok(stream::iter(stored_messages).map(Ok).chain(live_messages))
}
}
@@ -147,7 +72,7 @@ pub struct Broadcaster {
// The use of std::sync::Mutex, and not tokio::sync::Mutex, follows Tokio's
// own advice: <https://tokio.rs/tokio/tutorial/shared-state>. Methods that
// lock it must be sync.
- senders: Arc<Mutex<HashMap<ChannelId, Sender<Message>>>>,
+ senders: Arc<Mutex<HashMap<ChannelId, Sender<BroadcastMessage>>>>,
}
impl Broadcaster {
@@ -186,7 +111,7 @@ impl Broadcaster {
// panic: if ``channel`` has not been previously registered, and was not
// part of the initial set of channels.
- pub fn broadcast(&self, channel: &ChannelId, message: Message) {
+ pub fn broadcast(&self, channel: &ChannelId, message: BroadcastMessage) {
let tx = self.sender(channel);
// Per the Tokio docs, the returned error is only used to indicate that
@@ -198,7 +123,7 @@ impl Broadcaster {
// panic: if ``channel`` has not been previously registered, and was not
// part of the initial set of channels.
- pub fn listen(&self, channel: &ChannelId) -> BroadcastStream<Message> {
+ pub fn listen(&self, channel: &ChannelId) -> BroadcastStream<BroadcastMessage> {
let rx = self.sender(channel).subscribe();
BroadcastStream::from(rx)
@@ -206,15 +131,15 @@ impl Broadcaster {
// panic: if ``channel`` has not been previously registered, and was not
// part of the initial set of channels.
- fn sender(&self, channel: &ChannelId) -> Sender<Message> {
+ fn sender(&self, channel: &ChannelId) -> Sender<BroadcastMessage> {
self.senders()[channel].clone()
}
- fn senders(&self) -> MutexGuard<HashMap<ChannelId, Sender<Message>>> {
+ fn senders(&self) -> MutexGuard<HashMap<ChannelId, Sender<BroadcastMessage>>> {
self.senders.lock().unwrap() // propagate panics when mutex is poisoned
}
- fn make_sender() -> Sender<Message> {
+ fn make_sender() -> Sender<BroadcastMessage> {
// Queue depth of 16 chosen entirely arbitrarily. Don't read too much
// into it.
let (tx, _) = channel(16);
diff --git a/src/channel/repo/messages.rs b/src/channel/repo/messages.rs
index 0d74ea9..fe833b6 100644
--- a/src/channel/repo/messages.rs
+++ b/src/channel/repo/messages.rs
@@ -4,7 +4,10 @@ use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction};
use super::channels::Id as ChannelId;
use crate::{
- clock::DateTime, error::BoxedError, id::Id as BaseId, login::repo::logins::Id as LoginId,
+ clock::DateTime,
+ error::BoxedError,
+ id::Id as BaseId,
+ login::repo::logins::{Id as LoginId, Login, Logins},
};
pub trait Provider {
@@ -19,13 +22,10 @@ impl<'c> Provider for Transaction<'c, Sqlite> {
pub struct Messages<'t>(&'t mut SqliteConnection);
-#[derive(Clone, Debug)]
-pub struct Message {
+#[derive(Clone, Debug, serde::Serialize)]
+pub struct BroadcastMessage {
pub id: Id,
- pub sender: LoginId,
- // Field not actually used at this time, but you can reinstate it if you
- // need to. It's not omitted out of any greater design intention.
- // pub channel: ChannelId,
+ pub sender: Login,
pub body: String,
pub sent_at: DateTime,
}
@@ -37,50 +37,67 @@ impl<'c> Messages<'c> {
channel: &ChannelId,
body: &str,
sent_at: &DateTime,
- ) -> Result<Message, BoxedError> {
+ ) -> Result<BroadcastMessage, BoxedError> {
let id = Id::generate();
- let message = sqlx::query_as!(
- Message,
+ let sender = Logins::from(&mut *self.0).by_id(sender).await?;
+
+ let message = sqlx::query!(
r#"
insert into message
(id, sender, channel, body, sent_at)
values ($1, $2, $3, $4, $5)
returning
id as "id: Id",
- sender as "sender: LoginId",
- -- channel as "channel: ChannelId",
body,
sent_at as "sent_at: DateTime"
"#,
id,
- sender,
+ sender.id,
channel,
body,
sent_at,
)
+ .map(|row| BroadcastMessage {
+ sender: sender.clone(),
+ id: row.id,
+ body: row.body,
+ sent_at: row.sent_at,
+ })
.fetch_one(&mut *self.0)
.await?;
Ok(message)
}
- pub async fn all(&mut self, channel: &ChannelId) -> Result<Vec<Message>, BoxedError> {
- let messages = sqlx::query_as!(
- Message,
+ pub async fn for_replay(
+ &mut self,
+ channel: &ChannelId,
+ ) -> Result<Vec<BroadcastMessage>, BoxedError> {
+ let messages = sqlx::query!(
r#"
select
- id as "id: Id",
- sender as "sender: LoginId",
- -- channel as "channel: ChannelId",
- body,
- sent_at as "sent_at: DateTime"
+ message.id as "id: Id",
+ login.id as "sender_id: LoginId",
+ login.name as sender_name,
+ message.body,
+ message.sent_at as "sent_at: DateTime"
from message
+ join login on message.sender = login.id
where channel = $1
order by sent_at asc
"#,
channel,
)
+ .map(|row| BroadcastMessage {
+ id: row.id,
+ sender: Login {
+ id: row.sender_id,
+ name: row.sender_name,
+ },
+ body: row.body,
+ sent_at: row.sent_at,
+ })
.fetch_all(&mut *self.0)
.await?;
diff --git a/src/login/repo/logins.rs b/src/login/repo/logins.rs
index e1c5057..b0c8ce4 100644
--- a/src/login/repo/logins.rs
+++ b/src/login/repo/logins.rs
@@ -107,6 +107,12 @@ impl<'c> Logins<'c> {
}
}
+impl<'t> From<&'t mut SqliteConnection> for Logins<'t> {
+ fn from(tx: &'t mut SqliteConnection) -> Self {
+ Self(tx)
+ }
+}
+
/// Stable identifier for a [Login]. Prefixed with `L`.
#[derive(Clone, Debug, Eq, PartialEq, sqlx::Type, serde::Serialize)]
#[sqlx(transparent)]