use sqlx::{SqliteConnection, Transaction, sqlite::Sqlite}; use super::{ Body, Event, History, Id, Message, event::{Deleted, Sent}, }; use crate::{ clock::DateTime, conversation, event::{Instant, Sequence}, user, }; pub trait Provider { fn messages(&mut self) -> Messages<'_>; } impl Provider for Transaction<'_, Sqlite> { fn messages(&mut self) -> Messages<'_> { Messages(self) } } pub struct Messages<'t>(&'t mut SqliteConnection); impl Messages<'_> { pub async fn record_events( &mut self, events: impl IntoIterator, ) -> Result<(), sqlx::Error> { for event in events { self.record_event(&event).await?; } Ok(()) } pub async fn record_event(&mut self, event: &Event) -> Result<(), sqlx::Error> { match event { Event::Sent(sent) => self.record_sent(sent).await, Event::Deleted(deleted) => self.record_deleted(deleted).await, } } async fn record_sent(&mut self, sent: &Sent) -> Result<(), sqlx::Error> { let Message { id, conversation, sender, body, sent, deleted: _, } = &sent.message; sqlx::query!( r#" insert into message (id, conversation, sender, body, sent_at, sent_sequence, last_sequence) values ($1, $2, $3, $4, $5, $6, $6) "#, id, conversation, sender, body, sent.at, sent.sequence, ) .execute(&mut *self.0) .await?; Ok(()) } async fn record_deleted(&mut self, deleted: &Deleted) -> Result<(), sqlx::Error> { let Deleted { instant, id } = deleted; sqlx::query!( r#" insert into message_deleted (id, deleted_at, deleted_sequence) values ($1, $2, $3) "#, id, instant.at, instant.sequence, ) .execute(&mut *self.0) .await?; // Small social responsibility hack here: when a message is deleted, its body is // retconned to have been the empty string. Someone reading the event stream // afterwards, or looking at messages in the conversation, cannot retrieve the // "deleted" message by ignoring the deletion event. sqlx::query!( r#" update message set body = '', last_sequence = max(last_sequence, $1) where id = $2 "#, instant.sequence, id, ) .execute(&mut *self.0) .await?; Ok(()) } pub async fn live( &mut self, conversation: &conversation::History, ) -> Result, sqlx::Error> { let conversation_id = conversation.id(); let messages = sqlx::query!( r#" select message.conversation as "conversation: conversation::Id", message.sender as "sender: user::Id", id as "id: Id", message.body as "body: Body", message.sent_at as "sent_at: DateTime", message.sent_sequence as "sent_sequence: Sequence", deleted.deleted_at as "deleted_at?: DateTime", deleted.deleted_sequence as "deleted_sequence?: Sequence" from message left join message_deleted as deleted using (id) where message.conversation = $1 and deleted.id is null "#, conversation_id, ) .map(|row| History { message: Message { sent: Instant::new(row.sent_at, row.sent_sequence), conversation: row.conversation, sender: row.sender, id: row.id, body: row.body.unwrap_or_default(), deleted: Instant::optional(row.deleted_at, row.deleted_sequence), }, }) .fetch_all(&mut *self.0) .await?; Ok(messages) } pub async fn all(&mut self, resume_at: Sequence) -> Result, sqlx::Error> { let messages = sqlx::query!( r#" select message.conversation as "conversation: conversation::Id", message.sender as "sender: user::Id", message.id as "id: Id", message.body as "body: Body", message.sent_at as "sent_at: DateTime", message.sent_sequence as "sent_sequence: Sequence", deleted.deleted_at as "deleted_at?: DateTime", deleted.deleted_sequence as "deleted_sequence?: Sequence" from message left join message_deleted as deleted using (id) where message.sent_sequence <= $1 order by message.sent_sequence "#, resume_at, ) .map(|row| History { message: Message { sent: Instant::new(row.sent_at, row.sent_sequence), conversation: row.conversation, sender: row.sender, id: row.id, body: row.body.unwrap_or_default(), deleted: Instant::optional(row.deleted_at, row.deleted_sequence), }, }) .fetch_all(&mut *self.0) .await?; Ok(messages) } pub async fn by_id(&mut self, message: &Id) -> Result { let message = sqlx::query!( r#" select message.conversation as "conversation: conversation::Id", message.sender as "sender: user::Id", id as "id: Id", message.body as "body: Body", message.sent_at as "sent_at: DateTime", message.sent_sequence as "sent_sequence: Sequence", deleted.deleted_at as "deleted_at?: DateTime", deleted.deleted_sequence as "deleted_sequence?: Sequence" from message left join message_deleted as deleted using (id) where id = $1 "#, message, ) .map(|row| History { message: Message { sent: Instant::new(row.sent_at, row.sent_sequence), conversation: row.conversation, sender: row.sender, id: row.id, body: row.body.unwrap_or_default(), deleted: Instant::optional(row.deleted_at, row.deleted_sequence), }, }) .fetch_one(&mut *self.0) .await?; Ok(message) } pub async fn purge(&mut self, purge_at: &DateTime) -> Result<(), sqlx::Error> { let messages = sqlx::query_scalar!( r#" delete from message_deleted where deleted_at < $1 returning id as "id: Id" "#, purge_at, ) .fetch_all(&mut *self.0) .await?; for message in messages { sqlx::query!( r#" delete from message where id = $1 "#, message, ) .execute(&mut *self.0) .await?; } Ok(()) } pub async fn expired(&mut self, expire_at: &DateTime) -> Result, sqlx::Error> { let messages = sqlx::query!( r#" select id as "id: Id", message.conversation as "conversation: conversation::Id", message.sender as "sender: user::Id", message.sent_at as "sent_at: DateTime", message.sent_sequence as "sent_sequence: Sequence", message.body as "body: Body", deleted.deleted_at as "deleted_at?: DateTime", deleted.deleted_sequence as "deleted_sequence?: Sequence" from message left join message_deleted as deleted using (id) where message.sent_at < $1 and deleted.id is null "#, expire_at, ) .map(|row| History { message: Message { sent: Instant::new(row.sent_at, row.sent_sequence), id: row.id, conversation: row.conversation, sender: row.sender, body: row.body.unwrap_or_default(), deleted: Instant::optional(row.deleted_at, row.deleted_sequence), }, }) .fetch_all(&mut *self.0) .await?; Ok(messages) } pub async fn replay(&mut self, resume_at: Sequence) -> Result, sqlx::Error> { let messages = sqlx::query!( r#" select id as "id: Id", message.conversation as "conversation: conversation::Id", message.sender as "sender: user::Id", message.sent_at as "sent_at: DateTime", message.sent_sequence as "sent_sequence: Sequence", message.body as "body: Body", deleted.deleted_at as "deleted_at?: DateTime", deleted.deleted_sequence as "deleted_sequence?: Sequence" from message left join message_deleted as deleted using (id) where message.last_sequence > $1 "#, resume_at, ) .map(|row| History { message: Message { sent: Instant::new(row.sent_at, row.sent_sequence), conversation: row.conversation, sender: row.sender, id: row.id, body: row.body.unwrap_or_default(), deleted: Instant::optional(row.deleted_at, row.deleted_sequence), }, }) .fetch_all(&mut *self.0) .await?; Ok(messages) } }