summaryrefslogtreecommitdiff
path: root/src/channel/app.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/channel/app.rs')
-rw-r--r--src/channel/app.rs89
1 files changed, 85 insertions, 4 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs
index 7b02300..4aa2622 100644
--- a/src/channel/app.rs
+++ b/src/channel/app.rs
@@ -11,9 +11,13 @@ use tokio_stream::wrappers::BroadcastStream;
use super::repo::{
channels::{Id as ChannelId, Provider as _},
- messages::{Message, Provider as _},
+ messages::{Id as MessageId, Message as StoredMessage, Provider as _},
+};
+use crate::{
+ clock::DateTime,
+ error::BoxedError,
+ login::repo::logins::{Id as LoginId, Login, Provider as _},
};
-use crate::{clock::DateTime, error::BoxedError, login::repo::logins::Login};
pub struct Channels<'a> {
db: &'a SqlitePool,
@@ -46,6 +50,7 @@ impl<'a> Channels<'a> {
.messages()
.create(&login.id, channel, body, sent_at)
.await?;
+ let message = Message::from_login(login, message)?;
tx.commit().await?;
self.broadcaster.broadcast(channel, message)?;
@@ -58,16 +63,92 @@ impl<'a> Channels<'a> {
) -> Result<impl Stream<Item = Result<Message, BoxedError>>, BoxedError> {
let live_messages = self.broadcaster.listen(channel)?.map_err(BoxedError::from);
+ let db = self.db.clone();
let mut tx = self.db.begin().await?;
let stored_messages = tx.messages().all(channel).await?;
+ let stored_messages = stream::iter(stored_messages).then(move |msg| {
+ // The exact series of moves and clones here is the result of trial
+ // and error, and is likely the best I can do, given:
+ //
+ // * This closure _can't_ keep a reference to self, for lifetime
+ // reasons;
+ // * The closure will be executed multiple times, so it can't give
+ // up `db`; and
+ // * The returned future can't keep a reference to `db` as doing
+ // so would allow refs to the closure's `db` to outlive the
+ // closure itself.
+ //
+ // Fortunately, cloning the pool is acceptable - sqlx pools were
+ // designed to be cloned and the only thing actually cloned is a
+ // single `Arc`. This whole chain of clones just ends up producing
+ // cheap handles to a single underlying "real" pool.
+ let db = db.clone();
+ async move {
+ let mut tx = db.begin().await?;
+ let msg = Message::from_stored(&mut tx, msg).await?;
+ tx.commit().await?;
+
+ Ok(msg)
+ }
+ });
tx.commit().await?;
- let stored_messages = stream::iter(stored_messages.into_iter().map(Ok));
-
Ok(stored_messages.chain(live_messages))
}
}
+#[derive(Clone, Debug, serde::Serialize)]
+pub struct Message {
+ pub id: MessageId,
+ pub sender: Login,
+ pub body: String,
+ pub sent_at: DateTime,
+}
+
+impl Message {
+ async fn from_stored(
+ tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>,
+ message: StoredMessage,
+ ) -> Result<Self, BoxedError> {
+ let sender = tx.logins().by_id(&message.sender).await?;
+
+ let message = Self {
+ sender,
+ id: message.id,
+ body: message.body,
+ sent_at: message.sent_at,
+ };
+
+ Ok(message)
+ }
+
+ fn from_login(sender: &Login, message: StoredMessage) -> Result<Self, MessageError> {
+ if sender.id != message.sender {
+ // This functionally can't happen, but the funny thing about "This
+ // can never happen" comments is that they're usually wrong.
+ return Err(MessageError::LoginMismatched {
+ sender: sender.id.clone(),
+ message: message.sender,
+ });
+ }
+
+ let message = Self {
+ sender: sender.clone(),
+ id: message.id,
+ body: message.body,
+ sent_at: message.sent_at,
+ };
+
+ Ok(message)
+ }
+}
+
+#[derive(Debug, thiserror::Error)]
+enum MessageError {
+ #[error("sender login id {sender} did not match message login id {message}")]
+ LoginMismatched { sender: LoginId, message: LoginId },
+}
+
// Clones will share the same senders collection.
#[derive(Clone)]
pub struct Broadcaster {