use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; use super::{snapshot::Message, History, Id}; use crate::{ channel::{self, Channel}, clock::DateTime, event::{Instant, Sequence}, login::{self, Login}, }; pub trait Provider { fn messages(&mut self) -> Messages; } impl<'c> Provider for Transaction<'c, Sqlite> { fn messages(&mut self) -> Messages { Messages(self) } } pub struct Messages<'t>(&'t mut SqliteConnection); impl<'c> Messages<'c> { pub async fn create( &mut self, channel: &Channel, sender: &Login, sent: &Instant, body: &str, ) -> Result { let id = Id::generate(); let message = sqlx::query!( r#" insert into message (id, channel, sender, sent_at, sent_sequence, body) values ($1, $2, $3, $4, $5, $6) returning id as "id: Id", body "#, id, channel.id, sender.id, sent.at, sent.sequence, body, ) .map(|row| History { message: Message { sent: *sent, channel: channel.clone(), sender: sender.clone(), id: row.id, body: row.body, }, deleted: None, }) .fetch_one(&mut *self.0) .await?; Ok(message) } pub async fn in_channel( &mut self, channel: &Channel, resume_at: Option, ) -> Result, sqlx::Error> { let messages = sqlx::query!( r#" select channel.id as "channel_id: channel::Id", channel.name as "channel_name", sender.id as "sender_id: login::Id", sender.name as "sender_name", message.id as "id: Id", message.body, sent_at as "sent_at: DateTime", sent_sequence as "sent_sequence: Sequence" from message join channel on message.channel = channel.id join login as sender on message.sender = sender.id where channel.id = $1 and coalesce(message.sent_sequence <= $2, true) order by message.sent_sequence "#, channel.id, resume_at, ) .map(|row| History { message: Message { sent: Instant { at: row.sent_at, sequence: row.sent_sequence, }, channel: Channel { id: row.channel_id, name: row.channel_name, }, sender: Login { id: row.sender_id, name: row.sender_name, }, id: row.id, body: row.body, }, deleted: None, }) .fetch_all(&mut *self.0) .await?; Ok(messages) } async fn by_id(&mut self, message: &Id) -> Result { let message = sqlx::query!( r#" select channel.id as "channel_id: channel::Id", channel.name as "channel_name", sender.id as "sender_id: login::Id", sender.name as "sender_name", message.id as "id: Id", message.body, sent_at as "sent_at: DateTime", sent_sequence as "sent_sequence: Sequence" from message join channel on message.channel = channel.id join login as sender on message.sender = sender.id where message.id = $1 "#, message, ) .map(|row| History { message: Message { sent: Instant { at: row.sent_at, sequence: row.sent_sequence, }, channel: Channel { id: row.channel_id, name: row.channel_name, }, sender: Login { id: row.sender_id, name: row.sender_name, }, id: row.id, body: row.body, }, deleted: None, }) .fetch_one(&mut *self.0) .await?; Ok(message) } pub async fn delete( &mut self, message: &Id, deleted: &Instant, ) -> Result { let history = self.by_id(message).await?; sqlx::query_scalar!( r#" delete from message where id = $1 returning 1 as "deleted: i64" "#, history.message.id, ) .fetch_one(&mut *self.0) .await?; Ok(History { deleted: Some(*deleted), ..history }) } pub async fn expired(&mut self, expire_at: &DateTime) -> Result, sqlx::Error> { let messages = sqlx::query_scalar!( r#" select id as "message: Id" from message where sent_at < $1 "#, expire_at, ) .fetch_all(&mut *self.0) .await?; Ok(messages) } pub async fn replay( &mut self, resume_at: Option, ) -> Result, sqlx::Error> { let messages = sqlx::query!( r#" select channel.id as "channel_id: channel::Id", channel.name as "channel_name", sender.id as "sender_id: login::Id", sender.name as "sender_name", message.id as "id: Id", message.body, sent_at as "sent_at: DateTime", sent_sequence as "sent_sequence: Sequence" from message join channel on message.channel = channel.id join login as sender on message.sender = sender.id where coalesce(message.sent_sequence > $1, true) "#, resume_at, ) .map(|row| History { message: Message { sent: Instant { at: row.sent_at, sequence: row.sent_sequence, }, channel: Channel { id: row.channel_id, name: row.channel_name, }, sender: Login { id: row.sender_id, name: row.sender_name, }, id: row.id, body: row.body, }, deleted: None, }) .fetch_all(&mut *self.0) .await?; Ok(messages) } }