diff options
| author | Owen Jacobson <owen@grimoire.ca> | 2025-06-30 22:00:57 -0400 |
|---|---|---|
| committer | Owen Jacobson <owen@grimoire.ca> | 2025-07-03 22:43:42 -0400 |
| commit | a15e3d580124f561864c6a39f1e035eb1b3aab13 (patch) | |
| tree | ef80f725e7b02547a23b5c29a482fbf3fd188c0d /src/conversation/repo.rs | |
| parent | 5af4aea1e2f143499529b70f9cf191c6994265c6 (diff) | |
Rename "channel" to "conversation" within the server.
I've split this from the schema and API changes because, frankly, it's huge. Annoyingly so. There are no semantic changes in this, it's all symbol changes, but there are a _lot_ of them because the term "channel" leaks all over everything in a service whose primary role is managing messages sent to channels (now, conversations).
I found a buggy test while working on this! It's not fixed in this commit, because it felt mean to hide a real change in the middle of this much chaff.
Diffstat (limited to 'src/conversation/repo.rs')
| -rw-r--r-- | src/conversation/repo.rs | 332 |
1 files changed, 332 insertions, 0 deletions
diff --git a/src/conversation/repo.rs b/src/conversation/repo.rs new file mode 100644 index 0000000..82b5f01 --- /dev/null +++ b/src/conversation/repo.rs @@ -0,0 +1,332 @@ +use futures::stream::{StreamExt as _, TryStreamExt as _}; +use sqlx::{SqliteConnection, Transaction, sqlite::Sqlite}; + +use crate::{ + clock::DateTime, + conversation::{Conversation, History, Id}, + db::NotFound, + event::{Instant, Sequence}, + name::{self, Name}, +}; + +pub trait Provider { + fn conversations(&mut self) -> Conversations; +} + +impl Provider for Transaction<'_, Sqlite> { + fn conversations(&mut self) -> Conversations { + Conversations(self) + } +} + +pub struct Conversations<'t>(&'t mut SqliteConnection); + +impl Conversations<'_> { + pub async fn create(&mut self, name: &Name, created: &Instant) -> Result<History, sqlx::Error> { + let id = Id::generate(); + let name = name.clone(); + let display_name = name.display(); + let canonical_name = name.canonical(); + let created = *created; + + sqlx::query!( + r#" + insert into conversation (id, created_at, created_sequence, last_sequence) + values ($1, $2, $3, $4) + "#, + id, + created.at, + created.sequence, + created.sequence, + ) + .execute(&mut *self.0) + .await?; + + sqlx::query!( + r#" + insert into conversation_name (id, display_name, canonical_name) + values ($1, $2, $3) + "#, + id, + display_name, + canonical_name, + ) + .execute(&mut *self.0) + .await?; + + let conversation = History { + conversation: Conversation { + created, + id, + name: name.clone(), + deleted_at: None, + }, + deleted: None, + }; + + Ok(conversation) + } + + pub async fn by_id(&mut self, conversation: &Id) -> Result<History, LoadError> { + let conversation = sqlx::query!( + r#" + select + id as "id: Id", + name.display_name as "display_name?: String", + name.canonical_name as "canonical_name?: String", + conversation.created_at as "created_at: DateTime", + conversation.created_sequence as "created_sequence: Sequence", + deleted.deleted_at as "deleted_at?: DateTime", + deleted.deleted_sequence as "deleted_sequence?: Sequence" + from conversation + left join conversation_name as name + using (id) + left join conversation_deleted as deleted + using (id) + where id = $1 + "#, + conversation, + ) + .map(|row| { + Ok::<_, name::Error>(History { + conversation: Conversation { + created: Instant::new(row.created_at, row.created_sequence), + id: row.id, + name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(), + deleted_at: row.deleted_at, + }, + deleted: Instant::optional(row.deleted_at, row.deleted_sequence), + }) + }) + .fetch_one(&mut *self.0) + .await??; + + Ok(conversation) + } + + pub async fn all(&mut self, resume_at: Sequence) -> Result<Vec<History>, LoadError> { + let conversations = sqlx::query!( + r#" + select + id as "id: Id", + name.display_name as "display_name?: String", + name.canonical_name as "canonical_name?: String", + conversation.created_at as "created_at: DateTime", + conversation.created_sequence as "created_sequence: Sequence", + deleted.deleted_at as "deleted_at?: DateTime", + deleted.deleted_sequence as "deleted_sequence?: Sequence" + from conversation + left join conversation_name as name + using (id) + left join conversation_deleted as deleted + using (id) + where conversation.created_sequence <= $1 + order by name.canonical_name + "#, + resume_at, + ) + .map(|row| { + Ok::<_, name::Error>(History { + conversation: Conversation { + created: Instant::new(row.created_at, row.created_sequence), + id: row.id, + name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(), + deleted_at: row.deleted_at, + }, + deleted: Instant::optional(row.deleted_at, row.deleted_sequence), + }) + }) + .fetch(&mut *self.0) + .map(|res| Ok::<_, LoadError>(res??)) + .try_collect() + .await?; + + Ok(conversations) + } + + pub async fn replay(&mut self, resume_at: Sequence) -> Result<Vec<History>, LoadError> { + let conversations = sqlx::query!( + r#" + select + id as "id: Id", + name.display_name as "display_name?: String", + name.canonical_name as "canonical_name?: String", + conversation.created_at as "created_at: DateTime", + conversation.created_sequence as "created_sequence: Sequence", + deleted.deleted_at as "deleted_at?: DateTime", + deleted.deleted_sequence as "deleted_sequence?: Sequence" + from conversation + left join conversation_name as name + using (id) + left join conversation_deleted as deleted + using (id) + where conversation.last_sequence > $1 + "#, + resume_at, + ) + .map(|row| { + Ok::<_, name::Error>(History { + conversation: Conversation { + created: Instant::new(row.created_at, row.created_sequence), + id: row.id, + name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(), + deleted_at: row.deleted_at, + }, + deleted: Instant::optional(row.deleted_at, row.deleted_sequence), + }) + }) + .fetch(&mut *self.0) + .map(|res| Ok::<_, LoadError>(res??)) + .try_collect() + .await?; + + Ok(conversations) + } + + pub async fn delete( + &mut self, + conversation: &History, + deleted: &Instant, + ) -> Result<History, LoadError> { + let id = conversation.id(); + sqlx::query!( + r#" + update conversation + set last_sequence = max(last_sequence, $1) + where id = $2 + returning id as "id: Id" + "#, + deleted.sequence, + id, + ) + .fetch_one(&mut *self.0) + .await?; + + sqlx::query!( + r#" + insert into conversation_deleted (id, deleted_at, deleted_sequence) + values ($1, $2, $3) + "#, + id, + deleted.at, + deleted.sequence, + ) + .execute(&mut *self.0) + .await?; + + // Small social responsibility hack here: when a conversation is deleted, its + // name is retconned to have been the empty string. Someone reading the event + // stream afterwards, or looking at conversations via the API, cannot retrieve + // the "deleted" conversation's information by ignoring the deletion event. + sqlx::query!( + r#" + delete from conversation_name + where id = $1 + "#, + id, + ) + .execute(&mut *self.0) + .await?; + + let conversation = self.by_id(id).await?; + + Ok(conversation) + } + + pub async fn purge(&mut self, purge_at: &DateTime) -> Result<(), sqlx::Error> { + let conversations = sqlx::query_scalar!( + r#" + with has_messages as ( + select conversation + from message + group by conversation + ) + delete from conversation_deleted + where deleted_at < $1 + and id not in has_messages + returning id as "id: Id" + "#, + purge_at, + ) + .fetch_all(&mut *self.0) + .await?; + + for conversation in conversations { + // Wanted: a way to batch these up into one query. + sqlx::query!( + r#" + delete from conversation + where id = $1 + "#, + conversation, + ) + .execute(&mut *self.0) + .await?; + } + + Ok(()) + } + + pub async fn expired(&mut self, expired_at: &DateTime) -> Result<Vec<History>, LoadError> { + let conversations = sqlx::query!( + r#" + select + conversation.id as "id: Id", + name.display_name as "display_name?: String", + name.canonical_name as "canonical_name?: String", + conversation.created_at as "created_at: DateTime", + conversation.created_sequence as "created_sequence: Sequence", + deleted.deleted_at as "deleted_at?: DateTime", + deleted.deleted_sequence as "deleted_sequence?: Sequence" + from conversation + left join conversation_name as name + using (id) + left join conversation_deleted as deleted + using (id) + left join message + on conversation.id = message.conversation + where conversation.created_at < $1 + and message.id is null + and deleted.id is null + "#, + expired_at, + ) + .map(|row| { + Ok::<_, name::Error>(History { + conversation: Conversation { + created: Instant::new(row.created_at, row.created_sequence), + id: row.id, + name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(), + deleted_at: row.deleted_at, + }, + deleted: Instant::optional(row.deleted_at, row.deleted_sequence), + }) + }) + .fetch(&mut *self.0) + .map(|res| Ok::<_, LoadError>(res??)) + .try_collect() + .await?; + + Ok(conversations) + } +} + +#[derive(Debug, thiserror::Error)] +#[error(transparent)] +pub enum LoadError { + Database(#[from] sqlx::Error), + Name(#[from] name::Error), +} + +impl<T> NotFound for Result<T, LoadError> { + type Ok = T; + type Error = LoadError; + + fn optional(self) -> Result<Option<T>, LoadError> { + match self { + Ok(value) => Ok(Some(value)), + Err(LoadError::Database(sqlx::Error::RowNotFound)) => Ok(None), + Err(other) => Err(other), + } + } +} |
