summaryrefslogtreecommitdiff
path: root/src/channel
diff options
context:
space:
mode:
Diffstat (limited to 'src/channel')
-rw-r--r--src/channel/app.rs89
-rw-r--r--src/channel/repo/messages.rs10
-rw-r--r--src/channel/routes.rs12
3 files changed, 97 insertions, 14 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs
index 7b02300..4aa2622 100644
--- a/src/channel/app.rs
+++ b/src/channel/app.rs
@@ -11,9 +11,13 @@ use tokio_stream::wrappers::BroadcastStream;
use super::repo::{
channels::{Id as ChannelId, Provider as _},
- messages::{Message, Provider as _},
+ messages::{Id as MessageId, Message as StoredMessage, Provider as _},
+};
+use crate::{
+ clock::DateTime,
+ error::BoxedError,
+ login::repo::logins::{Id as LoginId, Login, Provider as _},
};
-use crate::{clock::DateTime, error::BoxedError, login::repo::logins::Login};
pub struct Channels<'a> {
db: &'a SqlitePool,
@@ -46,6 +50,7 @@ impl<'a> Channels<'a> {
.messages()
.create(&login.id, channel, body, sent_at)
.await?;
+ let message = Message::from_login(login, message)?;
tx.commit().await?;
self.broadcaster.broadcast(channel, message)?;
@@ -58,16 +63,92 @@ impl<'a> Channels<'a> {
) -> Result<impl Stream<Item = Result<Message, BoxedError>>, BoxedError> {
let live_messages = self.broadcaster.listen(channel)?.map_err(BoxedError::from);
+ let db = self.db.clone();
let mut tx = self.db.begin().await?;
let stored_messages = tx.messages().all(channel).await?;
+ let stored_messages = stream::iter(stored_messages).then(move |msg| {
+ // The exact series of moves and clones here is the result of trial
+ // and error, and is likely the best I can do, given:
+ //
+ // * This closure _can't_ keep a reference to self, for lifetime
+ // reasons;
+ // * The closure will be executed multiple times, so it can't give
+ // up `db`; and
+ // * The returned future can't keep a reference to `db` as doing
+ // so would allow refs to the closure's `db` to outlive the
+ // closure itself.
+ //
+ // Fortunately, cloning the pool is acceptable - sqlx pools were
+ // designed to be cloned and the only thing actually cloned is a
+ // single `Arc`. This whole chain of clones just ends up producing
+ // cheap handles to a single underlying "real" pool.
+ let db = db.clone();
+ async move {
+ let mut tx = db.begin().await?;
+ let msg = Message::from_stored(&mut tx, msg).await?;
+ tx.commit().await?;
+
+ Ok(msg)
+ }
+ });
tx.commit().await?;
- let stored_messages = stream::iter(stored_messages.into_iter().map(Ok));
-
Ok(stored_messages.chain(live_messages))
}
}
+#[derive(Clone, Debug, serde::Serialize)]
+pub struct Message {
+ pub id: MessageId,
+ pub sender: Login,
+ pub body: String,
+ pub sent_at: DateTime,
+}
+
+impl Message {
+ async fn from_stored(
+ tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>,
+ message: StoredMessage,
+ ) -> Result<Self, BoxedError> {
+ let sender = tx.logins().by_id(&message.sender).await?;
+
+ let message = Self {
+ sender,
+ id: message.id,
+ body: message.body,
+ sent_at: message.sent_at,
+ };
+
+ Ok(message)
+ }
+
+ fn from_login(sender: &Login, message: StoredMessage) -> Result<Self, MessageError> {
+ if sender.id != message.sender {
+ // This functionally can't happen, but the funny thing about "This
+ // can never happen" comments is that they're usually wrong.
+ return Err(MessageError::LoginMismatched {
+ sender: sender.id.clone(),
+ message: message.sender,
+ });
+ }
+
+ let message = Self {
+ sender: sender.clone(),
+ id: message.id,
+ body: message.body,
+ sent_at: message.sent_at,
+ };
+
+ Ok(message)
+ }
+}
+
+#[derive(Debug, thiserror::Error)]
+enum MessageError {
+ #[error("sender login id {sender} did not match message login id {message}")]
+ LoginMismatched { sender: LoginId, message: LoginId },
+}
+
// Clones will share the same senders collection.
#[derive(Clone)]
pub struct Broadcaster {
diff --git a/src/channel/repo/messages.rs b/src/channel/repo/messages.rs
index bdb0d29..0d74ea9 100644
--- a/src/channel/repo/messages.rs
+++ b/src/channel/repo/messages.rs
@@ -19,11 +19,13 @@ impl<'c> Provider for Transaction<'c, Sqlite> {
pub struct Messages<'t>(&'t mut SqliteConnection);
-#[derive(Clone, Debug, serde::Serialize)]
+#[derive(Clone, Debug)]
pub struct Message {
pub id: Id,
pub sender: LoginId,
- pub channel: ChannelId,
+ // Field not actually used at this time, but you can reinstate it if you
+ // need to. It's not omitted out of any greater design intention.
+ // pub channel: ChannelId,
pub body: String,
pub sent_at: DateTime,
}
@@ -47,7 +49,7 @@ impl<'c> Messages<'c> {
returning
id as "id: Id",
sender as "sender: LoginId",
- channel as "channel: ChannelId",
+ -- channel as "channel: ChannelId",
body,
sent_at as "sent_at: DateTime"
"#,
@@ -70,7 +72,7 @@ impl<'c> Messages<'c> {
select
id as "id: Id",
sender as "sender: LoginId",
- channel as "channel: ChannelId",
+ -- channel as "channel: ChannelId",
body,
sent_at as "sent_at: DateTime"
from message
diff --git a/src/channel/routes.rs b/src/channel/routes.rs
index 83c733c..0f95c69 100644
--- a/src/channel/routes.rs
+++ b/src/channel/routes.rs
@@ -8,10 +8,13 @@ use axum::{
routing::{get, post},
Router,
};
-use futures::stream::{StreamExt as _, TryStreamExt as _};
+use futures::{future, stream::TryStreamExt as _};
use super::repo::channels::Id as ChannelId;
-use crate::{app::App, clock::RequestedAt, error::InternalError, login::repo::logins::Login};
+use crate::{
+ app::App, clock::RequestedAt, error::BoxedError, error::InternalError,
+ login::repo::logins::Login,
+};
pub fn router() -> Router<App> {
Router::new()
@@ -63,10 +66,7 @@ async fn on_events(
.channels()
.events(&channel)
.await?
- .map(|msg| match msg {
- Ok(msg) => Ok(serde_json::to_string(&msg)?),
- Err(err) => Err(err),
- })
+ .and_then(|msg| future::ready(serde_json::to_string(&msg).map_err(BoxedError::from)))
.map_ok(|msg| sse::Event::default().data(&msg));
Ok(Sse::new(stream).keep_alive(sse::KeepAlive::default()))