diff options
Diffstat (limited to 'src/conversation/repo.rs')
| -rw-r--r-- | src/conversation/repo.rs | 138 |
1 files changed, 72 insertions, 66 deletions
diff --git a/src/conversation/repo.rs b/src/conversation/repo.rs index 7e38b62..cb66bf8 100644 --- a/src/conversation/repo.rs +++ b/src/conversation/repo.rs @@ -1,9 +1,12 @@ use futures::stream::{StreamExt as _, TryStreamExt as _}; use sqlx::{SqliteConnection, Transaction, sqlite::Sqlite}; +use super::{ + Conversation, Event, History, Id, + event::{Created, Deleted}, +}; use crate::{ clock::DateTime, - conversation::{Conversation, History, Id}, db::NotFound, event::{Instant, Sequence}, name::{self, Name}, @@ -22,22 +25,41 @@ impl Provider for Transaction<'_, Sqlite> { pub struct Conversations<'t>(&'t mut SqliteConnection); impl Conversations<'_> { - pub async fn create(&mut self, name: &Name, created: &Instant) -> Result<History, sqlx::Error> { - let id = Id::generate(); - let name = name.clone(); + pub async fn record_events( + &mut self, + events: impl IntoIterator<Item = Event>, + ) -> Result<(), sqlx::Error> { + for event in events { + self.record_event(&event).await?; + } + Ok(()) + } + + pub async fn record_event(&mut self, event: &Event) -> Result<(), sqlx::Error> { + match event { + Event::Created(created) => self.record_created(created).await, + Event::Deleted(deleted) => self.record_deleted(deleted).await, + } + } + + async fn record_created(&mut self, created: &Created) -> Result<(), sqlx::Error> { + let Conversation { + id, + created, + name, + deleted: _, + } = &created.conversation; let display_name = name.display(); let canonical_name = name.canonical(); - let created = *created; sqlx::query!( r#" insert into conversation (id, created_at, created_sequence, last_sequence) - values ($1, $2, $3, $4) + values ($1, $2, $3, $3) "#, id, created.at, created.sequence, - created.sequence, ) .execute(&mut *self.0) .await?; @@ -54,16 +76,50 @@ impl Conversations<'_> { .execute(&mut *self.0) .await?; - let conversation = History { - conversation: Conversation { - created, - id, - name: name.clone(), - deleted: None, - }, - }; + Ok(()) + } - Ok(conversation) + async fn record_deleted(&mut self, deleted: &Deleted) -> Result<(), sqlx::Error> { + let Deleted { instant, id } = deleted; + sqlx::query!( + r#" + update conversation + set last_sequence = max(last_sequence, $1) + where id = $2 + "#, + instant.sequence, + id, + ) + .execute(&mut *self.0) + .await?; + + sqlx::query!( + r#" + insert into conversation_deleted (id, deleted_at, deleted_sequence) + values ($1, $2, $3) + "#, + id, + instant.at, + instant.sequence, + ) + .execute(&mut *self.0) + .await?; + + // Small social responsibility hack here: when a conversation is deleted, its + // name is retconned to have been the empty string. Someone reading the event + // stream afterwards, or looking at conversations via the API, cannot retrieve + // the "deleted" conversation's information by ignoring the deletion event. + sqlx::query!( + r#" + delete from conversation_name + where id = $1 + "#, + id, + ) + .execute(&mut *self.0) + .await?; + + Ok(()) } pub async fn by_id(&mut self, conversation: &Id) -> Result<History, LoadError> { @@ -179,56 +235,6 @@ impl Conversations<'_> { Ok(conversations) } - pub async fn delete( - &mut self, - conversation: &History, - deleted: &Instant, - ) -> Result<History, LoadError> { - let id = conversation.id(); - sqlx::query!( - r#" - update conversation - set last_sequence = max(last_sequence, $1) - where id = $2 - returning id as "id: Id" - "#, - deleted.sequence, - id, - ) - .fetch_one(&mut *self.0) - .await?; - - sqlx::query!( - r#" - insert into conversation_deleted (id, deleted_at, deleted_sequence) - values ($1, $2, $3) - "#, - id, - deleted.at, - deleted.sequence, - ) - .execute(&mut *self.0) - .await?; - - // Small social responsibility hack here: when a conversation is deleted, its - // name is retconned to have been the empty string. Someone reading the event - // stream afterwards, or looking at conversations via the API, cannot retrieve - // the "deleted" conversation's information by ignoring the deletion event. - sqlx::query!( - r#" - delete from conversation_name - where id = $1 - "#, - id, - ) - .execute(&mut *self.0) - .await?; - - let conversation = self.by_id(id).await?; - - Ok(conversation) - } - pub async fn purge(&mut self, purge_at: &DateTime) -> Result<(), sqlx::Error> { let conversations = sqlx::query_scalar!( r#" |
