diff options
| -rw-r--r-- | .sqlx/query-17c3748391b152beb85ce3cf58b1689d916fd1c209645c74e17a8da2102eada3.json | 12 | ||||
| -rw-r--r-- | .sqlx/query-32f50277d687903773db053379a428eb461067b1769b8de60a73e2117a3cd11f.json (renamed from .sqlx/query-1f0f35655dd57532897aaba9bde38547e626387dfe5b859f02ae1dbe171d5741.json) | 6 | ||||
| -rw-r--r-- | .sqlx/query-85145c8b8264e7d01eef66e22353037f33fd3a3eaec0e36f06ffbbffb625aa24.json | 20 | ||||
| -rw-r--r-- | src/conversation/app.rs | 69 | ||||
| -rw-r--r-- | src/conversation/history.rs | 40 | ||||
| -rw-r--r-- | src/conversation/repo.rs | 138 |
6 files changed, 169 insertions, 116 deletions
diff --git a/.sqlx/query-17c3748391b152beb85ce3cf58b1689d916fd1c209645c74e17a8da2102eada3.json b/.sqlx/query-17c3748391b152beb85ce3cf58b1689d916fd1c209645c74e17a8da2102eada3.json new file mode 100644 index 0000000..46bfea0 --- /dev/null +++ b/.sqlx/query-17c3748391b152beb85ce3cf58b1689d916fd1c209645c74e17a8da2102eada3.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "\n update conversation\n set last_sequence = max(last_sequence, $1)\n where id = $2\n ", + "describe": { + "columns": [], + "parameters": { + "Right": 2 + }, + "nullable": [] + }, + "hash": "17c3748391b152beb85ce3cf58b1689d916fd1c209645c74e17a8da2102eada3" +} diff --git a/.sqlx/query-1f0f35655dd57532897aaba9bde38547e626387dfe5b859f02ae1dbe171d5741.json b/.sqlx/query-32f50277d687903773db053379a428eb461067b1769b8de60a73e2117a3cd11f.json index 9c62ca9..3d33188 100644 --- a/.sqlx/query-1f0f35655dd57532897aaba9bde38547e626387dfe5b859f02ae1dbe171d5741.json +++ b/.sqlx/query-32f50277d687903773db053379a428eb461067b1769b8de60a73e2117a3cd11f.json @@ -1,12 +1,12 @@ { "db_name": "SQLite", - "query": "\n insert into conversation (id, created_at, created_sequence, last_sequence)\n values ($1, $2, $3, $4)\n ", + "query": "\n insert into conversation (id, created_at, created_sequence, last_sequence)\n values ($1, $2, $3, $3)\n ", "describe": { "columns": [], "parameters": { - "Right": 4 + "Right": 3 }, "nullable": [] }, - "hash": "1f0f35655dd57532897aaba9bde38547e626387dfe5b859f02ae1dbe171d5741" + "hash": "32f50277d687903773db053379a428eb461067b1769b8de60a73e2117a3cd11f" } diff --git a/.sqlx/query-85145c8b8264e7d01eef66e22353037f33fd3a3eaec0e36f06ffbbffb625aa24.json b/.sqlx/query-85145c8b8264e7d01eef66e22353037f33fd3a3eaec0e36f06ffbbffb625aa24.json deleted file mode 100644 index 9d212fa..0000000 --- a/.sqlx/query-85145c8b8264e7d01eef66e22353037f33fd3a3eaec0e36f06ffbbffb625aa24.json +++ /dev/null @@ -1,20 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n update conversation\n set last_sequence = max(last_sequence, $1)\n where id = $2\n returning id as \"id: Id\"\n ", - "describe": { - "columns": [ - { - "name": "id: Id", - "ordinal": 0, - "type_info": "Text" - } - ], - "parameters": { - "Right": 2 - }, - "nullable": [ - false - ] - }, - "hash": "85145c8b8264e7d01eef66e22353037f33fd3a3eaec0e36f06ffbbffb625aa24" -} diff --git a/src/conversation/app.rs b/src/conversation/app.rs index 81ccdcf..30baf77 100644 --- a/src/conversation/app.rs +++ b/src/conversation/app.rs @@ -3,7 +3,7 @@ use itertools::Itertools; use sqlx::sqlite::SqlitePool; use super::{ - Conversation, Id, + Conversation, History, Id, history, repo::{LoadError, Provider as _}, validate, }; @@ -11,7 +11,7 @@ use crate::{ clock::DateTime, db::{Duplicate as _, NotFound as _}, event::{Broadcaster, Event, Sequence, repo::Provider as _}, - message::{self, repo::Provider as _}, + message::repo::Provider as _, name::{self, Name}, }; @@ -36,15 +36,21 @@ impl<'a> Conversations<'a> { let mut tx = self.db.begin().await?; let created = tx.sequence().next(created_at).await?; - let conversation = tx - .conversations() - .create(name, &created) + let conversation = History::begin(name, created); + + // This filter technically includes every event in the history, but it's easier to follow if + // the various event-manipulating app methods are consistent, and it's harmless to have an + // always-satisfied filter. + let events = conversation.events().filter(Sequence::start_from(created)); + tx.conversations() + .record_events(events.clone()) .await .duplicate(|| CreateError::DuplicateName(name.clone()))?; + tx.commit().await?; self.events - .broadcast(conversation.events().map(Event::from).collect::<Vec<_>>()); + .broadcast(events.map(Event::from).collect::<Vec<_>>()); Ok(conversation.as_created()) } @@ -78,11 +84,6 @@ impl<'a> Conversations<'a> { .by_id(conversation) .await .not_found(|| DeleteError::NotFound(conversation.clone()))?; - conversation - .as_snapshot() - .ok_or_else(|| DeleteError::Deleted(conversation.id().clone()))?; - - let mut events = Vec::new(); let messages = tx.messages().live(&conversation).await?; let has_messages = messages @@ -94,17 +95,15 @@ impl<'a> Conversations<'a> { } let deleted = tx.sequence().next(deleted_at).await?; - let conversation = tx.conversations().delete(&conversation, &deleted).await?; - events.extend( - conversation - .events() - .filter(Sequence::start_from(deleted.sequence)) - .map(Event::from), - ); + let conversation = conversation.delete(deleted)?; + + let events = conversation.events().filter(Sequence::start_from(deleted)); + tx.conversations().record_events(events.clone()).await?; tx.commit().await?; - self.events.broadcast(events); + self.events + .broadcast(events.map(Event::from).collect::<Vec<_>>()); Ok(()) } @@ -120,12 +119,14 @@ impl<'a> Conversations<'a> { let mut events = Vec::with_capacity(expired.len()); for conversation in expired { let deleted = tx.sequence().next(relative_to).await?; - let conversation = tx.conversations().delete(&conversation, &deleted).await?; - events.push( - conversation - .events() - .filter(Sequence::start_from(deleted.sequence)), - ); + let conversation = conversation.delete(deleted)?; + + let conversation_events = conversation.events().filter(Sequence::start_from(deleted)); + tx.conversations() + .record_events(conversation_events.clone()) + .await?; + + events.push(conversation_events); } tx.commit().await?; @@ -218,8 +219,18 @@ impl From<LoadError> for DeleteError { } } +impl From<history::DeleteError> for DeleteError { + fn from(error: history::DeleteError) -> Self { + match error { + history::DeleteError::Deleted(conversation) => Self::Deleted(conversation.id().clone()), + } + } +} + #[derive(Debug, thiserror::Error)] pub enum ExpireError { + #[error("tried to expire already-deleted conversation: {0}")] + Deleted(Id), #[error(transparent)] Database(#[from] sqlx::Error), #[error(transparent)] @@ -234,3 +245,11 @@ impl From<LoadError> for ExpireError { } } } + +impl From<history::DeleteError> for ExpireError { + fn from(error: history::DeleteError) -> Self { + match error { + history::DeleteError::Deleted(conversation) => Self::Deleted(conversation.id().clone()), + } + } +} diff --git a/src/conversation/history.rs b/src/conversation/history.rs index 8821277..5cba9ca 100644 --- a/src/conversation/history.rs +++ b/src/conversation/history.rs @@ -4,13 +4,49 @@ use super::{ Conversation, Id, event::{Created, Deleted, Event}, }; -use crate::event::Sequence; +use crate::{ + event::{Instant, Sequence}, + name::Name, +}; #[derive(Clone, Debug, Eq, PartialEq)] pub struct History { pub conversation: Conversation, } +// Lifecycle interface +impl History { + pub fn begin(name: &Name, created: Instant) -> Self { + Self { + conversation: Conversation { + id: Id::generate(), + name: name.clone(), + created, + deleted: None, + }, + } + } + + pub fn delete(self, deleted: Instant) -> Result<Self, DeleteError> { + if self.conversation.deleted.is_none() { + Ok(Self { + conversation: Conversation { + deleted: Some(deleted), + ..self.conversation + }, + }) + } else { + Err(DeleteError::Deleted(self)) + } + } +} + +#[derive(Debug, thiserror::Error)] +pub enum DeleteError { + #[error("conversation {} already deleted", .0.conversation.id)] + Deleted(History), +} + // State interface impl History { pub fn id(&self) -> &Id { @@ -41,7 +77,7 @@ impl History { // Event factories impl History { - pub fn events(&self) -> impl Iterator<Item = Event> + use<> { + pub fn events(&self) -> impl Iterator<Item = Event> + Clone + use<> { [self.created()] .into_iter() .merge_by(self.deleted(), Sequence::merge) diff --git a/src/conversation/repo.rs b/src/conversation/repo.rs index 7e38b62..cb66bf8 100644 --- a/src/conversation/repo.rs +++ b/src/conversation/repo.rs @@ -1,9 +1,12 @@ use futures::stream::{StreamExt as _, TryStreamExt as _}; use sqlx::{SqliteConnection, Transaction, sqlite::Sqlite}; +use super::{ + Conversation, Event, History, Id, + event::{Created, Deleted}, +}; use crate::{ clock::DateTime, - conversation::{Conversation, History, Id}, db::NotFound, event::{Instant, Sequence}, name::{self, Name}, @@ -22,22 +25,41 @@ impl Provider for Transaction<'_, Sqlite> { pub struct Conversations<'t>(&'t mut SqliteConnection); impl Conversations<'_> { - pub async fn create(&mut self, name: &Name, created: &Instant) -> Result<History, sqlx::Error> { - let id = Id::generate(); - let name = name.clone(); + pub async fn record_events( + &mut self, + events: impl IntoIterator<Item = Event>, + ) -> Result<(), sqlx::Error> { + for event in events { + self.record_event(&event).await?; + } + Ok(()) + } + + pub async fn record_event(&mut self, event: &Event) -> Result<(), sqlx::Error> { + match event { + Event::Created(created) => self.record_created(created).await, + Event::Deleted(deleted) => self.record_deleted(deleted).await, + } + } + + async fn record_created(&mut self, created: &Created) -> Result<(), sqlx::Error> { + let Conversation { + id, + created, + name, + deleted: _, + } = &created.conversation; let display_name = name.display(); let canonical_name = name.canonical(); - let created = *created; sqlx::query!( r#" insert into conversation (id, created_at, created_sequence, last_sequence) - values ($1, $2, $3, $4) + values ($1, $2, $3, $3) "#, id, created.at, created.sequence, - created.sequence, ) .execute(&mut *self.0) .await?; @@ -54,16 +76,50 @@ impl Conversations<'_> { .execute(&mut *self.0) .await?; - let conversation = History { - conversation: Conversation { - created, - id, - name: name.clone(), - deleted: None, - }, - }; + Ok(()) + } - Ok(conversation) + async fn record_deleted(&mut self, deleted: &Deleted) -> Result<(), sqlx::Error> { + let Deleted { instant, id } = deleted; + sqlx::query!( + r#" + update conversation + set last_sequence = max(last_sequence, $1) + where id = $2 + "#, + instant.sequence, + id, + ) + .execute(&mut *self.0) + .await?; + + sqlx::query!( + r#" + insert into conversation_deleted (id, deleted_at, deleted_sequence) + values ($1, $2, $3) + "#, + id, + instant.at, + instant.sequence, + ) + .execute(&mut *self.0) + .await?; + + // Small social responsibility hack here: when a conversation is deleted, its + // name is retconned to have been the empty string. Someone reading the event + // stream afterwards, or looking at conversations via the API, cannot retrieve + // the "deleted" conversation's information by ignoring the deletion event. + sqlx::query!( + r#" + delete from conversation_name + where id = $1 + "#, + id, + ) + .execute(&mut *self.0) + .await?; + + Ok(()) } pub async fn by_id(&mut self, conversation: &Id) -> Result<History, LoadError> { @@ -179,56 +235,6 @@ impl Conversations<'_> { Ok(conversations) } - pub async fn delete( - &mut self, - conversation: &History, - deleted: &Instant, - ) -> Result<History, LoadError> { - let id = conversation.id(); - sqlx::query!( - r#" - update conversation - set last_sequence = max(last_sequence, $1) - where id = $2 - returning id as "id: Id" - "#, - deleted.sequence, - id, - ) - .fetch_one(&mut *self.0) - .await?; - - sqlx::query!( - r#" - insert into conversation_deleted (id, deleted_at, deleted_sequence) - values ($1, $2, $3) - "#, - id, - deleted.at, - deleted.sequence, - ) - .execute(&mut *self.0) - .await?; - - // Small social responsibility hack here: when a conversation is deleted, its - // name is retconned to have been the empty string. Someone reading the event - // stream afterwards, or looking at conversations via the API, cannot retrieve - // the "deleted" conversation's information by ignoring the deletion event. - sqlx::query!( - r#" - delete from conversation_name - where id = $1 - "#, - id, - ) - .execute(&mut *self.0) - .await?; - - let conversation = self.by_id(id).await?; - - Ok(conversation) - } - pub async fn purge(&mut self, purge_at: &DateTime) -> Result<(), sqlx::Error> { let conversations = sqlx::query_scalar!( r#" |
