use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; use crate::{ clock::DateTime, repo::{ channel::Channel, login::{self, Login}, message, }, }; pub trait Provider { fn broadcast(&mut self) -> Broadcast; } impl<'c> Provider for Transaction<'c, Sqlite> { fn broadcast(&mut self) -> Broadcast { Broadcast(self) } } pub struct Broadcast<'t>(&'t mut SqliteConnection); #[derive(Clone, Debug, serde::Serialize)] pub struct Message { pub id: message::Id, pub sender: Login, pub body: String, pub sent_at: DateTime, } impl<'c> Broadcast<'c> { pub async fn create( &mut self, sender: &Login, channel: &Channel, body: &str, sent_at: &DateTime, ) -> Result { let id = message::Id::generate(); let message = sqlx::query!( r#" insert into message (id, sender, channel, body, sent_at) values ($1, $2, $3, $4, $5) returning id as "id: message::Id", sender as "sender: login::Id", body, sent_at as "sent_at: DateTime" "#, id, sender.id, channel.id, body, sent_at, ) .map(|row| Message { id: row.id, sender: sender.clone(), body: row.body, sent_at: row.sent_at, }) .fetch_one(&mut *self.0) .await?; Ok(message) } pub async fn expire(&mut self, expire_at: &DateTime) -> Result<(), sqlx::Error> { sqlx::query!( r#" delete from message where sent_at < $1 "#, expire_at, ) .execute(&mut *self.0) .await?; Ok(()) } pub async fn replay( &mut self, channel: &Channel, resume_at: Option<&DateTime>, ) -> Result, sqlx::Error> { let messages = sqlx::query!( r#" select message.id as "id: message::Id", login.id as "sender_id: login::Id", login.name as sender_name, message.body, message.sent_at as "sent_at: DateTime" from message join login on message.sender = login.id where channel = $1 and coalesce(sent_at > $2, true) order by sent_at asc "#, channel.id, resume_at, ) .map(|row| Message { id: row.id, sender: Login { id: row.sender_id, name: row.sender_name, }, body: row.body, sent_at: row.sent_at, }) .fetch_all(&mut *self.0) .await?; Ok(messages) } }