diff options
Diffstat (limited to 'src/conversation/repo.rs')
| -rw-r--r-- | src/conversation/repo.rs | 332 |
1 files changed, 332 insertions, 0 deletions
diff --git a/src/conversation/repo.rs b/src/conversation/repo.rs new file mode 100644 index 0000000..82b5f01 --- /dev/null +++ b/src/conversation/repo.rs @@ -0,0 +1,332 @@ +use futures::stream::{StreamExt as _, TryStreamExt as _}; +use sqlx::{SqliteConnection, Transaction, sqlite::Sqlite}; + +use crate::{ + clock::DateTime, + conversation::{Conversation, History, Id}, + db::NotFound, + event::{Instant, Sequence}, + name::{self, Name}, +}; + +pub trait Provider { + fn conversations(&mut self) -> Conversations; +} + +impl Provider for Transaction<'_, Sqlite> { + fn conversations(&mut self) -> Conversations { + Conversations(self) + } +} + +pub struct Conversations<'t>(&'t mut SqliteConnection); + +impl Conversations<'_> { + pub async fn create(&mut self, name: &Name, created: &Instant) -> Result<History, sqlx::Error> { + let id = Id::generate(); + let name = name.clone(); + let display_name = name.display(); + let canonical_name = name.canonical(); + let created = *created; + + sqlx::query!( + r#" + insert into conversation (id, created_at, created_sequence, last_sequence) + values ($1, $2, $3, $4) + "#, + id, + created.at, + created.sequence, + created.sequence, + ) + .execute(&mut *self.0) + .await?; + + sqlx::query!( + r#" + insert into conversation_name (id, display_name, canonical_name) + values ($1, $2, $3) + "#, + id, + display_name, + canonical_name, + ) + .execute(&mut *self.0) + .await?; + + let conversation = History { + conversation: Conversation { + created, + id, + name: name.clone(), + deleted_at: None, + }, + deleted: None, + }; + + Ok(conversation) + } + + pub async fn by_id(&mut self, conversation: &Id) -> Result<History, LoadError> { + let conversation = sqlx::query!( + r#" + select + id as "id: Id", + name.display_name as "display_name?: String", + name.canonical_name as "canonical_name?: String", + conversation.created_at as "created_at: DateTime", + conversation.created_sequence as "created_sequence: Sequence", + deleted.deleted_at as "deleted_at?: DateTime", + deleted.deleted_sequence as "deleted_sequence?: Sequence" + from conversation + left join conversation_name as name + using (id) + left join conversation_deleted as deleted + using (id) + where id = $1 + "#, + conversation, + ) + .map(|row| { + Ok::<_, name::Error>(History { + conversation: Conversation { + created: Instant::new(row.created_at, row.created_sequence), + id: row.id, + name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(), + deleted_at: row.deleted_at, + }, + deleted: Instant::optional(row.deleted_at, row.deleted_sequence), + }) + }) + .fetch_one(&mut *self.0) + .await??; + + Ok(conversation) + } + + pub async fn all(&mut self, resume_at: Sequence) -> Result<Vec<History>, LoadError> { + let conversations = sqlx::query!( + r#" + select + id as "id: Id", + name.display_name as "display_name?: String", + name.canonical_name as "canonical_name?: String", + conversation.created_at as "created_at: DateTime", + conversation.created_sequence as "created_sequence: Sequence", + deleted.deleted_at as "deleted_at?: DateTime", + deleted.deleted_sequence as "deleted_sequence?: Sequence" + from conversation + left join conversation_name as name + using (id) + left join conversation_deleted as deleted + using (id) + where conversation.created_sequence <= $1 + order by name.canonical_name + "#, + resume_at, + ) + .map(|row| { + Ok::<_, name::Error>(History { + conversation: Conversation { + created: Instant::new(row.created_at, row.created_sequence), + id: row.id, + name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(), + deleted_at: row.deleted_at, + }, + deleted: Instant::optional(row.deleted_at, row.deleted_sequence), + }) + }) + .fetch(&mut *self.0) + .map(|res| Ok::<_, LoadError>(res??)) + .try_collect() + .await?; + + Ok(conversations) + } + + pub async fn replay(&mut self, resume_at: Sequence) -> Result<Vec<History>, LoadError> { + let conversations = sqlx::query!( + r#" + select + id as "id: Id", + name.display_name as "display_name?: String", + name.canonical_name as "canonical_name?: String", + conversation.created_at as "created_at: DateTime", + conversation.created_sequence as "created_sequence: Sequence", + deleted.deleted_at as "deleted_at?: DateTime", + deleted.deleted_sequence as "deleted_sequence?: Sequence" + from conversation + left join conversation_name as name + using (id) + left join conversation_deleted as deleted + using (id) + where conversation.last_sequence > $1 + "#, + resume_at, + ) + .map(|row| { + Ok::<_, name::Error>(History { + conversation: Conversation { + created: Instant::new(row.created_at, row.created_sequence), + id: row.id, + name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(), + deleted_at: row.deleted_at, + }, + deleted: Instant::optional(row.deleted_at, row.deleted_sequence), + }) + }) + .fetch(&mut *self.0) + .map(|res| Ok::<_, LoadError>(res??)) + .try_collect() + .await?; + + Ok(conversations) + } + + pub async fn delete( + &mut self, + conversation: &History, + deleted: &Instant, + ) -> Result<History, LoadError> { + let id = conversation.id(); + sqlx::query!( + r#" + update conversation + set last_sequence = max(last_sequence, $1) + where id = $2 + returning id as "id: Id" + "#, + deleted.sequence, + id, + ) + .fetch_one(&mut *self.0) + .await?; + + sqlx::query!( + r#" + insert into conversation_deleted (id, deleted_at, deleted_sequence) + values ($1, $2, $3) + "#, + id, + deleted.at, + deleted.sequence, + ) + .execute(&mut *self.0) + .await?; + + // Small social responsibility hack here: when a conversation is deleted, its + // name is retconned to have been the empty string. Someone reading the event + // stream afterwards, or looking at conversations via the API, cannot retrieve + // the "deleted" conversation's information by ignoring the deletion event. + sqlx::query!( + r#" + delete from conversation_name + where id = $1 + "#, + id, + ) + .execute(&mut *self.0) + .await?; + + let conversation = self.by_id(id).await?; + + Ok(conversation) + } + + pub async fn purge(&mut self, purge_at: &DateTime) -> Result<(), sqlx::Error> { + let conversations = sqlx::query_scalar!( + r#" + with has_messages as ( + select conversation + from message + group by conversation + ) + delete from conversation_deleted + where deleted_at < $1 + and id not in has_messages + returning id as "id: Id" + "#, + purge_at, + ) + .fetch_all(&mut *self.0) + .await?; + + for conversation in conversations { + // Wanted: a way to batch these up into one query. + sqlx::query!( + r#" + delete from conversation + where id = $1 + "#, + conversation, + ) + .execute(&mut *self.0) + .await?; + } + + Ok(()) + } + + pub async fn expired(&mut self, expired_at: &DateTime) -> Result<Vec<History>, LoadError> { + let conversations = sqlx::query!( + r#" + select + conversation.id as "id: Id", + name.display_name as "display_name?: String", + name.canonical_name as "canonical_name?: String", + conversation.created_at as "created_at: DateTime", + conversation.created_sequence as "created_sequence: Sequence", + deleted.deleted_at as "deleted_at?: DateTime", + deleted.deleted_sequence as "deleted_sequence?: Sequence" + from conversation + left join conversation_name as name + using (id) + left join conversation_deleted as deleted + using (id) + left join message + on conversation.id = message.conversation + where conversation.created_at < $1 + and message.id is null + and deleted.id is null + "#, + expired_at, + ) + .map(|row| { + Ok::<_, name::Error>(History { + conversation: Conversation { + created: Instant::new(row.created_at, row.created_sequence), + id: row.id, + name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(), + deleted_at: row.deleted_at, + }, + deleted: Instant::optional(row.deleted_at, row.deleted_sequence), + }) + }) + .fetch(&mut *self.0) + .map(|res| Ok::<_, LoadError>(res??)) + .try_collect() + .await?; + + Ok(conversations) + } +} + +#[derive(Debug, thiserror::Error)] +#[error(transparent)] +pub enum LoadError { + Database(#[from] sqlx::Error), + Name(#[from] name::Error), +} + +impl<T> NotFound for Result<T, LoadError> { + type Ok = T; + type Error = LoadError; + + fn optional(self) -> Result<Option<T>, LoadError> { + match self { + Ok(value) => Ok(Some(value)), + Err(LoadError::Database(sqlx::Error::RowNotFound)) => Ok(None), + Err(other) => Err(other), + } + } +} |
