From a54c548bf00f881f36d2adc3a6a2614b5f72f9ce Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Tue, 26 Aug 2025 02:23:55 -0400 Subject: Allow callers to pass `Instant`s to `Sequence` predicate constructors. --- src/conversation/history.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) (limited to 'src/conversation/history.rs') diff --git a/src/conversation/history.rs b/src/conversation/history.rs index 746a1b0..8821277 100644 --- a/src/conversation/history.rs +++ b/src/conversation/history.rs @@ -30,9 +30,7 @@ impl History { where S: Into, { - self.events() - .filter(Sequence::up_to(sequence.into())) - .collect() + self.events().filter(Sequence::up_to(sequence)).collect() } // Snapshot of this conversation as of all events recorded in this history. -- cgit v1.2.3 From ca4ac1d0f12532c38d4041aba6ae50ae4093ae13 Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Tue, 26 Aug 2025 02:25:57 -0400 Subject: Store `Conversation` instances using their events. This replaces the approach of having the repo type know about conversation lifecycle in detail. Instead, the repo type accepts events and applies them to the DB blindly. The SQL written to implement each event does, however, embed assumptions about what order events will happen in. --- ...cf58b1689d916fd1c209645c74e17a8da2102eada3.json | 12 ++ ...a9bde38547e626387dfe5b859f02ae1dbe171d5741.json | 12 -- ...3379a428eb461067b1769b8de60a73e2117a3cd11f.json | 12 ++ ...e22353037f33fd3a3eaec0e36f06ffbbffb625aa24.json | 20 --- src/conversation/app.rs | 69 +++++++---- src/conversation/history.rs | 40 +++++- src/conversation/repo.rs | 138 +++++++++++---------- 7 files changed, 178 insertions(+), 125 deletions(-) create mode 100644 .sqlx/query-17c3748391b152beb85ce3cf58b1689d916fd1c209645c74e17a8da2102eada3.json delete mode 100644 .sqlx/query-1f0f35655dd57532897aaba9bde38547e626387dfe5b859f02ae1dbe171d5741.json create mode 100644 .sqlx/query-32f50277d687903773db053379a428eb461067b1769b8de60a73e2117a3cd11f.json delete mode 100644 .sqlx/query-85145c8b8264e7d01eef66e22353037f33fd3a3eaec0e36f06ffbbffb625aa24.json (limited to 'src/conversation/history.rs') diff --git a/.sqlx/query-17c3748391b152beb85ce3cf58b1689d916fd1c209645c74e17a8da2102eada3.json b/.sqlx/query-17c3748391b152beb85ce3cf58b1689d916fd1c209645c74e17a8da2102eada3.json new file mode 100644 index 0000000..46bfea0 --- /dev/null +++ b/.sqlx/query-17c3748391b152beb85ce3cf58b1689d916fd1c209645c74e17a8da2102eada3.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "\n update conversation\n set last_sequence = max(last_sequence, $1)\n where id = $2\n ", + "describe": { + "columns": [], + "parameters": { + "Right": 2 + }, + "nullable": [] + }, + "hash": "17c3748391b152beb85ce3cf58b1689d916fd1c209645c74e17a8da2102eada3" +} diff --git a/.sqlx/query-1f0f35655dd57532897aaba9bde38547e626387dfe5b859f02ae1dbe171d5741.json b/.sqlx/query-1f0f35655dd57532897aaba9bde38547e626387dfe5b859f02ae1dbe171d5741.json deleted file mode 100644 index 9c62ca9..0000000 --- a/.sqlx/query-1f0f35655dd57532897aaba9bde38547e626387dfe5b859f02ae1dbe171d5741.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n insert into conversation (id, created_at, created_sequence, last_sequence)\n values ($1, $2, $3, $4)\n ", - "describe": { - "columns": [], - "parameters": { - "Right": 4 - }, - "nullable": [] - }, - "hash": "1f0f35655dd57532897aaba9bde38547e626387dfe5b859f02ae1dbe171d5741" -} diff --git a/.sqlx/query-32f50277d687903773db053379a428eb461067b1769b8de60a73e2117a3cd11f.json b/.sqlx/query-32f50277d687903773db053379a428eb461067b1769b8de60a73e2117a3cd11f.json new file mode 100644 index 0000000..3d33188 --- /dev/null +++ b/.sqlx/query-32f50277d687903773db053379a428eb461067b1769b8de60a73e2117a3cd11f.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "\n insert into conversation (id, created_at, created_sequence, last_sequence)\n values ($1, $2, $3, $3)\n ", + "describe": { + "columns": [], + "parameters": { + "Right": 3 + }, + "nullable": [] + }, + "hash": "32f50277d687903773db053379a428eb461067b1769b8de60a73e2117a3cd11f" +} diff --git a/.sqlx/query-85145c8b8264e7d01eef66e22353037f33fd3a3eaec0e36f06ffbbffb625aa24.json b/.sqlx/query-85145c8b8264e7d01eef66e22353037f33fd3a3eaec0e36f06ffbbffb625aa24.json deleted file mode 100644 index 9d212fa..0000000 --- a/.sqlx/query-85145c8b8264e7d01eef66e22353037f33fd3a3eaec0e36f06ffbbffb625aa24.json +++ /dev/null @@ -1,20 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n update conversation\n set last_sequence = max(last_sequence, $1)\n where id = $2\n returning id as \"id: Id\"\n ", - "describe": { - "columns": [ - { - "name": "id: Id", - "ordinal": 0, - "type_info": "Text" - } - ], - "parameters": { - "Right": 2 - }, - "nullable": [ - false - ] - }, - "hash": "85145c8b8264e7d01eef66e22353037f33fd3a3eaec0e36f06ffbbffb625aa24" -} diff --git a/src/conversation/app.rs b/src/conversation/app.rs index 81ccdcf..30baf77 100644 --- a/src/conversation/app.rs +++ b/src/conversation/app.rs @@ -3,7 +3,7 @@ use itertools::Itertools; use sqlx::sqlite::SqlitePool; use super::{ - Conversation, Id, + Conversation, History, Id, history, repo::{LoadError, Provider as _}, validate, }; @@ -11,7 +11,7 @@ use crate::{ clock::DateTime, db::{Duplicate as _, NotFound as _}, event::{Broadcaster, Event, Sequence, repo::Provider as _}, - message::{self, repo::Provider as _}, + message::repo::Provider as _, name::{self, Name}, }; @@ -36,15 +36,21 @@ impl<'a> Conversations<'a> { let mut tx = self.db.begin().await?; let created = tx.sequence().next(created_at).await?; - let conversation = tx - .conversations() - .create(name, &created) + let conversation = History::begin(name, created); + + // This filter technically includes every event in the history, but it's easier to follow if + // the various event-manipulating app methods are consistent, and it's harmless to have an + // always-satisfied filter. + let events = conversation.events().filter(Sequence::start_from(created)); + tx.conversations() + .record_events(events.clone()) .await .duplicate(|| CreateError::DuplicateName(name.clone()))?; + tx.commit().await?; self.events - .broadcast(conversation.events().map(Event::from).collect::>()); + .broadcast(events.map(Event::from).collect::>()); Ok(conversation.as_created()) } @@ -78,11 +84,6 @@ impl<'a> Conversations<'a> { .by_id(conversation) .await .not_found(|| DeleteError::NotFound(conversation.clone()))?; - conversation - .as_snapshot() - .ok_or_else(|| DeleteError::Deleted(conversation.id().clone()))?; - - let mut events = Vec::new(); let messages = tx.messages().live(&conversation).await?; let has_messages = messages @@ -94,17 +95,15 @@ impl<'a> Conversations<'a> { } let deleted = tx.sequence().next(deleted_at).await?; - let conversation = tx.conversations().delete(&conversation, &deleted).await?; - events.extend( - conversation - .events() - .filter(Sequence::start_from(deleted.sequence)) - .map(Event::from), - ); + let conversation = conversation.delete(deleted)?; + + let events = conversation.events().filter(Sequence::start_from(deleted)); + tx.conversations().record_events(events.clone()).await?; tx.commit().await?; - self.events.broadcast(events); + self.events + .broadcast(events.map(Event::from).collect::>()); Ok(()) } @@ -120,12 +119,14 @@ impl<'a> Conversations<'a> { let mut events = Vec::with_capacity(expired.len()); for conversation in expired { let deleted = tx.sequence().next(relative_to).await?; - let conversation = tx.conversations().delete(&conversation, &deleted).await?; - events.push( - conversation - .events() - .filter(Sequence::start_from(deleted.sequence)), - ); + let conversation = conversation.delete(deleted)?; + + let conversation_events = conversation.events().filter(Sequence::start_from(deleted)); + tx.conversations() + .record_events(conversation_events.clone()) + .await?; + + events.push(conversation_events); } tx.commit().await?; @@ -218,8 +219,18 @@ impl From for DeleteError { } } +impl From for DeleteError { + fn from(error: history::DeleteError) -> Self { + match error { + history::DeleteError::Deleted(conversation) => Self::Deleted(conversation.id().clone()), + } + } +} + #[derive(Debug, thiserror::Error)] pub enum ExpireError { + #[error("tried to expire already-deleted conversation: {0}")] + Deleted(Id), #[error(transparent)] Database(#[from] sqlx::Error), #[error(transparent)] @@ -234,3 +245,11 @@ impl From for ExpireError { } } } + +impl From for ExpireError { + fn from(error: history::DeleteError) -> Self { + match error { + history::DeleteError::Deleted(conversation) => Self::Deleted(conversation.id().clone()), + } + } +} diff --git a/src/conversation/history.rs b/src/conversation/history.rs index 8821277..5cba9ca 100644 --- a/src/conversation/history.rs +++ b/src/conversation/history.rs @@ -4,13 +4,49 @@ use super::{ Conversation, Id, event::{Created, Deleted, Event}, }; -use crate::event::Sequence; +use crate::{ + event::{Instant, Sequence}, + name::Name, +}; #[derive(Clone, Debug, Eq, PartialEq)] pub struct History { pub conversation: Conversation, } +// Lifecycle interface +impl History { + pub fn begin(name: &Name, created: Instant) -> Self { + Self { + conversation: Conversation { + id: Id::generate(), + name: name.clone(), + created, + deleted: None, + }, + } + } + + pub fn delete(self, deleted: Instant) -> Result { + if self.conversation.deleted.is_none() { + Ok(Self { + conversation: Conversation { + deleted: Some(deleted), + ..self.conversation + }, + }) + } else { + Err(DeleteError::Deleted(self)) + } + } +} + +#[derive(Debug, thiserror::Error)] +pub enum DeleteError { + #[error("conversation {} already deleted", .0.conversation.id)] + Deleted(History), +} + // State interface impl History { pub fn id(&self) -> &Id { @@ -41,7 +77,7 @@ impl History { // Event factories impl History { - pub fn events(&self) -> impl Iterator + use<> { + pub fn events(&self) -> impl Iterator + Clone + use<> { [self.created()] .into_iter() .merge_by(self.deleted(), Sequence::merge) diff --git a/src/conversation/repo.rs b/src/conversation/repo.rs index 7e38b62..cb66bf8 100644 --- a/src/conversation/repo.rs +++ b/src/conversation/repo.rs @@ -1,9 +1,12 @@ use futures::stream::{StreamExt as _, TryStreamExt as _}; use sqlx::{SqliteConnection, Transaction, sqlite::Sqlite}; +use super::{ + Conversation, Event, History, Id, + event::{Created, Deleted}, +}; use crate::{ clock::DateTime, - conversation::{Conversation, History, Id}, db::NotFound, event::{Instant, Sequence}, name::{self, Name}, @@ -22,22 +25,41 @@ impl Provider for Transaction<'_, Sqlite> { pub struct Conversations<'t>(&'t mut SqliteConnection); impl Conversations<'_> { - pub async fn create(&mut self, name: &Name, created: &Instant) -> Result { - let id = Id::generate(); - let name = name.clone(); + pub async fn record_events( + &mut self, + events: impl IntoIterator, + ) -> Result<(), sqlx::Error> { + for event in events { + self.record_event(&event).await?; + } + Ok(()) + } + + pub async fn record_event(&mut self, event: &Event) -> Result<(), sqlx::Error> { + match event { + Event::Created(created) => self.record_created(created).await, + Event::Deleted(deleted) => self.record_deleted(deleted).await, + } + } + + async fn record_created(&mut self, created: &Created) -> Result<(), sqlx::Error> { + let Conversation { + id, + created, + name, + deleted: _, + } = &created.conversation; let display_name = name.display(); let canonical_name = name.canonical(); - let created = *created; sqlx::query!( r#" insert into conversation (id, created_at, created_sequence, last_sequence) - values ($1, $2, $3, $4) + values ($1, $2, $3, $3) "#, id, created.at, created.sequence, - created.sequence, ) .execute(&mut *self.0) .await?; @@ -54,16 +76,50 @@ impl Conversations<'_> { .execute(&mut *self.0) .await?; - let conversation = History { - conversation: Conversation { - created, - id, - name: name.clone(), - deleted: None, - }, - }; + Ok(()) + } - Ok(conversation) + async fn record_deleted(&mut self, deleted: &Deleted) -> Result<(), sqlx::Error> { + let Deleted { instant, id } = deleted; + sqlx::query!( + r#" + update conversation + set last_sequence = max(last_sequence, $1) + where id = $2 + "#, + instant.sequence, + id, + ) + .execute(&mut *self.0) + .await?; + + sqlx::query!( + r#" + insert into conversation_deleted (id, deleted_at, deleted_sequence) + values ($1, $2, $3) + "#, + id, + instant.at, + instant.sequence, + ) + .execute(&mut *self.0) + .await?; + + // Small social responsibility hack here: when a conversation is deleted, its + // name is retconned to have been the empty string. Someone reading the event + // stream afterwards, or looking at conversations via the API, cannot retrieve + // the "deleted" conversation's information by ignoring the deletion event. + sqlx::query!( + r#" + delete from conversation_name + where id = $1 + "#, + id, + ) + .execute(&mut *self.0) + .await?; + + Ok(()) } pub async fn by_id(&mut self, conversation: &Id) -> Result { @@ -179,56 +235,6 @@ impl Conversations<'_> { Ok(conversations) } - pub async fn delete( - &mut self, - conversation: &History, - deleted: &Instant, - ) -> Result { - let id = conversation.id(); - sqlx::query!( - r#" - update conversation - set last_sequence = max(last_sequence, $1) - where id = $2 - returning id as "id: Id" - "#, - deleted.sequence, - id, - ) - .fetch_one(&mut *self.0) - .await?; - - sqlx::query!( - r#" - insert into conversation_deleted (id, deleted_at, deleted_sequence) - values ($1, $2, $3) - "#, - id, - deleted.at, - deleted.sequence, - ) - .execute(&mut *self.0) - .await?; - - // Small social responsibility hack here: when a conversation is deleted, its - // name is retconned to have been the empty string. Someone reading the event - // stream afterwards, or looking at conversations via the API, cannot retrieve - // the "deleted" conversation's information by ignoring the deletion event. - sqlx::query!( - r#" - delete from conversation_name - where id = $1 - "#, - id, - ) - .execute(&mut *self.0) - .await?; - - let conversation = self.by_id(id).await?; - - Ok(conversation) - } - pub async fn purge(&mut self, purge_at: &DateTime) -> Result<(), sqlx::Error> { let conversations = sqlx::query_scalar!( r#" -- cgit v1.2.3