use futures::stream::{StreamExt as _, TryStreamExt as _}; use sqlx::{SqliteConnection, Transaction, sqlite::Sqlite}; use crate::{ clock::DateTime, conversation::{Conversation, History, Id}, db::NotFound, event::{Instant, Sequence}, name::{self, Name}, }; pub trait Provider { fn conversations(&mut self) -> Conversations<'_>; } impl Provider for Transaction<'_, Sqlite> { fn conversations(&mut self) -> Conversations<'_> { Conversations(self) } } pub struct Conversations<'t>(&'t mut SqliteConnection); impl Conversations<'_> { pub async fn create(&mut self, name: &Name, created: &Instant) -> Result { let id = Id::generate(); let name = name.clone(); let display_name = name.display(); let canonical_name = name.canonical(); let created = *created; sqlx::query!( r#" insert into conversation (id, created_at, created_sequence, last_sequence) values ($1, $2, $3, $4) "#, id, created.at, created.sequence, created.sequence, ) .execute(&mut *self.0) .await?; sqlx::query!( r#" insert into conversation_name (id, display_name, canonical_name) values ($1, $2, $3) "#, id, display_name, canonical_name, ) .execute(&mut *self.0) .await?; let conversation = History { conversation: Conversation { created, id, name: name.clone(), deleted: None, }, }; Ok(conversation) } pub async fn by_id(&mut self, conversation: &Id) -> Result { let conversation = sqlx::query!( r#" select id as "id: Id", name.display_name as "display_name?: String", name.canonical_name as "canonical_name?: String", conversation.created_at as "created_at: DateTime", conversation.created_sequence as "created_sequence: Sequence", deleted.deleted_at as "deleted_at?: DateTime", deleted.deleted_sequence as "deleted_sequence?: Sequence" from conversation left join conversation_name as name using (id) left join conversation_deleted as deleted using (id) where id = $1 "#, conversation, ) .map(|row| { Ok::<_, name::Error>(History { conversation: Conversation { created: Instant::new(row.created_at, row.created_sequence), id: row.id, name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(), deleted: Instant::optional(row.deleted_at, row.deleted_sequence), }, }) }) .fetch_one(&mut *self.0) .await??; Ok(conversation) } pub async fn all(&mut self, resume_at: Sequence) -> Result, LoadError> { let conversations = sqlx::query!( r#" select id as "id: Id", name.display_name as "display_name?: String", name.canonical_name as "canonical_name?: String", conversation.created_at as "created_at: DateTime", conversation.created_sequence as "created_sequence: Sequence", deleted.deleted_at as "deleted_at?: DateTime", deleted.deleted_sequence as "deleted_sequence?: Sequence" from conversation left join conversation_name as name using (id) left join conversation_deleted as deleted using (id) where conversation.created_sequence <= $1 order by name.canonical_name "#, resume_at, ) .map(|row| { Ok::<_, name::Error>(History { conversation: Conversation { created: Instant::new(row.created_at, row.created_sequence), id: row.id, name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(), deleted: Instant::optional(row.deleted_at, row.deleted_sequence), }, }) }) .fetch(&mut *self.0) .map(|res| Ok::<_, LoadError>(res??)) .try_collect() .await?; Ok(conversations) } pub async fn replay(&mut self, resume_at: Sequence) -> Result, LoadError> { let conversations = sqlx::query!( r#" select id as "id: Id", name.display_name as "display_name?: String", name.canonical_name as "canonical_name?: String", conversation.created_at as "created_at: DateTime", conversation.created_sequence as "created_sequence: Sequence", deleted.deleted_at as "deleted_at?: DateTime", deleted.deleted_sequence as "deleted_sequence?: Sequence" from conversation left join conversation_name as name using (id) left join conversation_deleted as deleted using (id) where conversation.last_sequence > $1 "#, resume_at, ) .map(|row| { Ok::<_, name::Error>(History { conversation: Conversation { created: Instant::new(row.created_at, row.created_sequence), id: row.id, name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(), deleted: Instant::optional(row.deleted_at, row.deleted_sequence), }, }) }) .fetch(&mut *self.0) .map(|res| Ok::<_, LoadError>(res??)) .try_collect() .await?; Ok(conversations) } pub async fn delete( &mut self, conversation: &History, deleted: &Instant, ) -> Result { let id = conversation.id(); sqlx::query!( r#" update conversation set last_sequence = max(last_sequence, $1) where id = $2 returning id as "id: Id" "#, deleted.sequence, id, ) .fetch_one(&mut *self.0) .await?; sqlx::query!( r#" insert into conversation_deleted (id, deleted_at, deleted_sequence) values ($1, $2, $3) "#, id, deleted.at, deleted.sequence, ) .execute(&mut *self.0) .await?; // Small social responsibility hack here: when a conversation is deleted, its // name is retconned to have been the empty string. Someone reading the event // stream afterwards, or looking at conversations via the API, cannot retrieve // the "deleted" conversation's information by ignoring the deletion event. sqlx::query!( r#" delete from conversation_name where id = $1 "#, id, ) .execute(&mut *self.0) .await?; let conversation = self.by_id(id).await?; Ok(conversation) } pub async fn purge(&mut self, purge_at: &DateTime) -> Result<(), sqlx::Error> { let conversations = sqlx::query_scalar!( r#" with has_messages as ( select conversation from message group by conversation ) delete from conversation_deleted where deleted_at < $1 and id not in has_messages returning id as "id: Id" "#, purge_at, ) .fetch_all(&mut *self.0) .await?; for conversation in conversations { // Wanted: a way to batch these up into one query. sqlx::query!( r#" delete from conversation where id = $1 "#, conversation, ) .execute(&mut *self.0) .await?; } Ok(()) } pub async fn expired(&mut self, expired_at: &DateTime) -> Result, LoadError> { let conversations = sqlx::query!( r#" select conversation.id as "id: Id", name.display_name as "display_name?: String", name.canonical_name as "canonical_name?: String", conversation.created_at as "created_at: DateTime", conversation.created_sequence as "created_sequence: Sequence", deleted.deleted_at as "deleted_at?: DateTime", deleted.deleted_sequence as "deleted_sequence?: Sequence" from conversation left join conversation_name as name using (id) left join conversation_deleted as deleted using (id) left join message on conversation.id = message.conversation where conversation.created_at < $1 and message.id is null and deleted.id is null "#, expired_at, ) .map(|row| { Ok::<_, name::Error>(History { conversation: Conversation { created: Instant::new(row.created_at, row.created_sequence), id: row.id, name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(), deleted: Instant::optional(row.deleted_at, row.deleted_sequence), }, }) }) .fetch(&mut *self.0) .map(|res| Ok::<_, LoadError>(res??)) .try_collect() .await?; Ok(conversations) } } #[derive(Debug, thiserror::Error)] #[error(transparent)] pub enum LoadError { Database(#[from] sqlx::Error), Name(#[from] name::Error), } impl NotFound for Result { type Ok = T; type Error = LoadError; fn optional(self) -> Result, LoadError> { match self { Ok(value) => Ok(Some(value)), Err(LoadError::Database(sqlx::Error::RowNotFound)) => Ok(None), Err(other) => Err(other), } } }