use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; use super::{snapshot::Message, History, Id}; use crate::{ channel, clock::DateTime, event::{Instant, ResumePoint, Sequence}, login::{self, Login}, }; pub trait Provider { fn messages(&mut self) -> Messages; } impl<'c> Provider for Transaction<'c, Sqlite> { fn messages(&mut self) -> Messages { Messages(self) } } pub struct Messages<'t>(&'t mut SqliteConnection); impl<'c> Messages<'c> { pub async fn create( &mut self, channel: &channel::History, sender: &Login, sent: &Instant, body: &str, ) -> Result { let id = Id::generate(); let channel_id = channel.id(); let message = sqlx::query!( r#" insert into message (id, channel, sender, sent_at, sent_sequence, body) values ($1, $2, $3, $4, $5, $6) returning id as "id: Id", channel as "channel: channel::Id", sender as "sender: login::Id", sent_at as "sent_at: DateTime", sent_sequence as "sent_sequence: Sequence", body "#, id, channel_id, sender.id, sent.at, sent.sequence, body, ) .map(|row| History { message: Message { sent: Instant { at: row.sent_at, sequence: row.sent_sequence, }, channel: row.channel, sender: row.sender, id: row.id, body: row.body, }, deleted: None, }) .fetch_one(&mut *self.0) .await?; Ok(message) } pub async fn in_channel( &mut self, channel: &channel::History, resume_at: ResumePoint, ) -> Result, sqlx::Error> { let channel_id = channel.id(); let messages = sqlx::query!( r#" select channel as "channel: channel::Id", sender as "sender: login::Id", id as "id: Id", body, sent_at as "sent_at: DateTime", sent_sequence as "sent_sequence: Sequence" from message where channel = $1 and coalesce(sent_sequence <= $2, true) order by sent_sequence "#, channel_id, resume_at, ) .map(|row| History { message: Message { sent: Instant { at: row.sent_at, sequence: row.sent_sequence, }, channel: row.channel, sender: row.sender, id: row.id, body: row.body, }, deleted: None, }) .fetch_all(&mut *self.0) .await?; Ok(messages) } async fn by_id(&mut self, message: &Id) -> Result { let message = sqlx::query!( r#" select channel as "channel: channel::Id", sender as "sender: login::Id", id as "id: Id", body, sent_at as "sent_at: DateTime", sent_sequence as "sent_sequence: Sequence" from message where id = $1 "#, message, ) .map(|row| History { message: Message { sent: Instant { at: row.sent_at, sequence: row.sent_sequence, }, channel: row.channel, sender: row.sender, id: row.id, body: row.body, }, deleted: None, }) .fetch_one(&mut *self.0) .await?; Ok(message) } pub async fn delete( &mut self, message: &Id, deleted: &Instant, ) -> Result { let history = self.by_id(message).await?; sqlx::query_scalar!( r#" delete from message where id = $1 returning 1 as "deleted: i64" "#, history.message.id, ) .fetch_one(&mut *self.0) .await?; Ok(History { deleted: Some(*deleted), ..history }) } pub async fn expired(&mut self, expire_at: &DateTime) -> Result, sqlx::Error> { let messages = sqlx::query_scalar!( r#" select id as "message: Id" from message where sent_at < $1 "#, expire_at, ) .fetch_all(&mut *self.0) .await?; Ok(messages) } pub async fn replay(&mut self, resume_at: ResumePoint) -> Result, sqlx::Error> { let messages = sqlx::query!( r#" select channel as "channel: channel::Id", sender as "sender: login::Id", id as "id: Id", body, sent_at as "sent_at: DateTime", sent_sequence as "sent_sequence: Sequence" from message where coalesce(message.sent_sequence > $1, true) "#, resume_at, ) .map(|row| History { message: Message { sent: Instant { at: row.sent_at, sequence: row.sent_sequence, }, channel: row.channel, sender: row.sender, id: row.id, body: row.body, }, deleted: None, }) .fetch_all(&mut *self.0) .await?; Ok(messages) } }