summaryrefslogtreecommitdiff
path: root/src/channel
diff options
context:
space:
mode:
authorOwen Jacobson <owen@grimoire.ca>2024-09-17 19:06:20 -0400
committerOwen Jacobson <owen@grimoire.ca>2024-09-17 19:06:20 -0400
commit921f38a73e5d58a5a6077477a8b52d2705798f55 (patch)
tree895eee6c31a9e6003e4d199e32eb04bcb32f8f11 /src/channel
parente6be82157fe718570aa13ab12803ee39083b8dff (diff)
Express record dependencies through types.
This provides a convenient place to _stick_ "not found" errors, though actually introducing them will come in a later commit.
Diffstat (limited to 'src/channel')
-rw-r--r--src/channel/app.rs13
-rw-r--r--src/channel/repo/broadcast.rs29
2 files changed, 20 insertions, 22 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs
index b0b63b3..29d9c09 100644
--- a/src/channel/app.rs
+++ b/src/channel/app.rs
@@ -55,13 +55,14 @@ impl<'a> Channels<'a> {
sent_at: &DateTime,
) -> Result<(), BoxedError> {
let mut tx = self.db.begin().await?;
+ let channel = tx.channels().by_id(channel).await?;
let message = tx
.broadcast()
- .create(&login.id, channel, body, sent_at)
+ .create(login, &channel, body, sent_at)
.await?;
tx.commit().await?;
- self.broadcaster.broadcast(channel, message);
+ self.broadcaster.broadcast(&channel.id, message);
Ok(())
}
@@ -71,6 +72,9 @@ impl<'a> Channels<'a> {
resume_at: Option<&DateTime>,
) -> Result<impl Stream<Item = Result<broadcast::Message, BoxedError>> + 'static, BoxedError>
{
+ let mut tx = self.db.begin().await?;
+ let channel = tx.channels().by_id(channel).await?;
+
fn skip_stale<E>(
resume_at: Option<&DateTime>,
) -> impl for<'m> FnMut(&'m broadcast::Message) -> future::Ready<Result<bool, E>> {
@@ -85,12 +89,11 @@ impl<'a> Channels<'a> {
let live_messages = self
.broadcaster
- .listen(channel)
+ .listen(&channel.id)
.map_err(BoxedError::from)
.try_skip_while(skip_stale(resume_at));
- let mut tx = self.db.begin().await?;
- let stored_messages = tx.broadcast().replay(channel, resume_at).await?;
+ let stored_messages = tx.broadcast().replay(&channel, resume_at).await?;
tx.commit().await?;
let stored_messages = stream::iter(stored_messages).map(Ok);
diff --git a/src/channel/repo/broadcast.rs b/src/channel/repo/broadcast.rs
index 3ca7396..ff16cd0 100644
--- a/src/channel/repo/broadcast.rs
+++ b/src/channel/repo/broadcast.rs
@@ -3,8 +3,8 @@ use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction};
use crate::{
clock::DateTime,
repo::{
- channel,
- login::{self, Login, Logins},
+ channel::Channel,
+ login::{self, Login},
message,
},
};
@@ -32,15 +32,13 @@ pub struct Message {
impl<'c> Broadcast<'c> {
pub async fn create(
&mut self,
- sender: &login::Id,
- channel: &channel::Id,
+ sender: &Login,
+ channel: &Channel,
body: &str,
sent_at: &DateTime,
) -> Result<Message, sqlx::Error> {
let id = message::Id::generate();
- let sender = Logins::from(&mut *self.0).by_id(sender).await?;
-
let message = sqlx::query!(
r#"
insert into message
@@ -54,18 +52,15 @@ impl<'c> Broadcast<'c> {
"#,
id,
sender.id,
- channel,
+ channel.id,
body,
sent_at,
)
- .map(|row| {
- debug_assert!(row.sender == sender.id);
- Message {
- id: row.id,
- sender: sender.clone(),
- body: row.body,
- sent_at: row.sent_at,
- }
+ .map(|row| Message {
+ id: row.id,
+ sender: sender.clone(),
+ body: row.body,
+ sent_at: row.sent_at,
})
.fetch_one(&mut *self.0)
.await?;
@@ -75,7 +70,7 @@ impl<'c> Broadcast<'c> {
pub async fn replay(
&mut self,
- channel: &channel::Id,
+ channel: &Channel,
resume_at: Option<&DateTime>,
) -> Result<Vec<Message>, sqlx::Error> {
let messages = sqlx::query!(
@@ -92,7 +87,7 @@ impl<'c> Broadcast<'c> {
and coalesce(sent_at > $2, true)
order by sent_at asc
"#,
- channel,
+ channel.id,
resume_at,
)
.map(|row| Message {