use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; use super::{snapshot::Message, Body, History, Id}; use crate::{ channel, clock::DateTime, event::{Instant, Sequence}, login::{self, Login}, }; pub trait Provider { fn messages(&mut self) -> Messages; } impl<'c> Provider for Transaction<'c, Sqlite> { fn messages(&mut self) -> Messages { Messages(self) } } pub struct Messages<'t>(&'t mut SqliteConnection); impl<'c> Messages<'c> { pub async fn create( &mut self, channel: &channel::History, sender: &Login, sent: &Instant, body: &Body, ) -> Result { let id = Id::generate(); let channel_id = channel.id(); let message = sqlx::query!( r#" insert into message (id, channel, sender, sent_at, sent_sequence, body, last_sequence) values ($1, $2, $3, $4, $5, $6, $7) returning id as "id: Id", channel as "channel: channel::Id", sender as "sender: login::Id", sent_at as "sent_at: DateTime", sent_sequence as "sent_sequence: Sequence", body as "body: Body" "#, id, channel_id, sender.id, sent.at, sent.sequence, body, sent.sequence, ) .map(|row| History { message: Message { sent: Instant::new(row.sent_at, row.sent_sequence), channel: row.channel, sender: row.sender, id: row.id, body: row.body.unwrap_or_default(), deleted_at: None, }, deleted: None, }) .fetch_one(&mut *self.0) .await?; Ok(message) } pub async fn live(&mut self, channel: &channel::History) -> Result, sqlx::Error> { let channel_id = channel.id(); let messages = sqlx::query!( r#" select message.channel as "channel: channel::Id", message.sender as "sender: login::Id", id as "id: Id", message.body as "body: Body", message.sent_at as "sent_at: DateTime", message.sent_sequence as "sent_sequence: Sequence", deleted.deleted_at as "deleted_at?: DateTime", deleted.deleted_sequence as "deleted_sequence?: Sequence" from message left join message_deleted as deleted using (id) where message.channel = $1 and deleted.id is null "#, channel_id, ) .map(|row| History { message: Message { sent: Instant::new(row.sent_at, row.sent_sequence), channel: row.channel, sender: row.sender, id: row.id, body: row.body.unwrap_or_default(), deleted_at: row.deleted_at, }, deleted: Instant::optional(row.deleted_at, row.deleted_sequence), }) .fetch_all(&mut *self.0) .await?; Ok(messages) } pub async fn all(&mut self, resume_at: Sequence) -> Result, sqlx::Error> { let messages = sqlx::query!( r#" select message.channel as "channel: channel::Id", message.sender as "sender: login::Id", message.id as "id: Id", message.body as "body: Body", message.sent_at as "sent_at: DateTime", message.sent_sequence as "sent_sequence: Sequence", deleted.deleted_at as "deleted_at?: DateTime", deleted.deleted_sequence as "deleted_sequence?: Sequence" from message left join message_deleted as deleted using (id) where message.sent_sequence <= $1 order by message.sent_sequence "#, resume_at, ) .map(|row| History { message: Message { sent: Instant::new(row.sent_at, row.sent_sequence), channel: row.channel, sender: row.sender, id: row.id, body: row.body.unwrap_or_default(), deleted_at: row.deleted_at, }, deleted: Instant::optional(row.deleted_at, row.deleted_sequence), }) .fetch_all(&mut *self.0) .await?; Ok(messages) } pub async fn by_id(&mut self, message: &Id) -> Result { let message = sqlx::query!( r#" select message.channel as "channel: channel::Id", message.sender as "sender: login::Id", id as "id: Id", message.body as "body: Body", message.sent_at as "sent_at: DateTime", message.sent_sequence as "sent_sequence: Sequence", deleted.deleted_at as "deleted_at?: DateTime", deleted.deleted_sequence as "deleted_sequence?: Sequence" from message left join message_deleted as deleted using (id) where id = $1 "#, message, ) .map(|row| History { message: Message { sent: Instant::new(row.sent_at, row.sent_sequence), channel: row.channel, sender: row.sender, id: row.id, body: row.body.unwrap_or_default(), deleted_at: row.deleted_at, }, deleted: Instant::optional(row.deleted_at, row.deleted_sequence), }) .fetch_one(&mut *self.0) .await?; Ok(message) } pub async fn delete( &mut self, message: &History, deleted: &Instant, ) -> Result { let id = message.id(); sqlx::query!( r#" insert into message_deleted (id, deleted_at, deleted_sequence) values ($1, $2, $3) "#, id, deleted.at, deleted.sequence, ) .execute(&mut *self.0) .await?; // Small social responsibility hack here: when a message is deleted, its body is // retconned to have been the empty string. Someone reading the event stream // afterwards, or looking at messages in the channel, cannot retrieve the // "deleted" message by ignoring the deletion event. sqlx::query!( r#" update message set body = '', last_sequence = max(last_sequence, $1) where id = $2 returning id as "id: Id" "#, deleted.sequence, id, ) .fetch_one(&mut *self.0) .await?; let message = self.by_id(id).await?; Ok(message) } pub async fn purge(&mut self, purge_at: &DateTime) -> Result<(), sqlx::Error> { let messages = sqlx::query_scalar!( r#" delete from message_deleted where deleted_at < $1 returning id as "id: Id" "#, purge_at, ) .fetch_all(&mut *self.0) .await?; for message in messages { sqlx::query!( r#" delete from message where id = $1 "#, message, ) .execute(&mut *self.0) .await?; } Ok(()) } pub async fn expired(&mut self, expire_at: &DateTime) -> Result, sqlx::Error> { let messages = sqlx::query!( r#" select id as "id: Id", message.channel as "channel: channel::Id", message.sender as "sender: login::Id", message.sent_at as "sent_at: DateTime", message.sent_sequence as "sent_sequence: Sequence", message.body as "body: Body", deleted.deleted_at as "deleted_at?: DateTime", deleted.deleted_sequence as "deleted_sequence?: Sequence" from message left join message_deleted as deleted using (id) where message.sent_at < $1 and deleted.id is null "#, expire_at, ) .map(|row| History { message: Message { sent: Instant::new(row.sent_at, row.sent_sequence), id: row.id, channel: row.channel, sender: row.sender, body: row.body.unwrap_or_default(), deleted_at: row.deleted_at, }, deleted: Instant::optional(row.deleted_at, row.deleted_sequence), }) .fetch_all(&mut *self.0) .await?; Ok(messages) } pub async fn replay(&mut self, resume_at: Sequence) -> Result, sqlx::Error> { let messages = sqlx::query!( r#" select id as "id: Id", message.channel as "channel: channel::Id", message.sender as "sender: login::Id", message.sent_at as "sent_at: DateTime", message.sent_sequence as "sent_sequence: Sequence", message.body as "body: Body", deleted.deleted_at as "deleted_at?: DateTime", deleted.deleted_sequence as "deleted_sequence?: Sequence" from message left join message_deleted as deleted using (id) where message.last_sequence > $1 "#, resume_at, ) .map(|row| History { message: Message { sent: Instant::new(row.sent_at, row.sent_sequence), channel: row.channel, sender: row.sender, id: row.id, body: row.body.unwrap_or_default(), deleted_at: row.deleted_at, }, deleted: Instant::optional(row.deleted_at, row.deleted_sequence), }) .fetch_all(&mut *self.0) .await?; Ok(messages) } }