diff options
| author | Owen Jacobson <owen@grimoire.ca> | 2025-08-26 02:25:57 -0400 |
|---|---|---|
| committer | Owen Jacobson <owen@grimoire.ca> | 2025-08-26 18:05:00 -0400 |
| commit | ca4ac1d0f12532c38d4041aba6ae50ae4093ae13 (patch) | |
| tree | 60b155ae0445e162b6b9d1c7763b88d4eaa1571a /src/conversation/app.rs | |
| parent | a54c548bf00f881f36d2adc3a6a2614b5f72f9ce (diff) | |
Store `Conversation` instances using their events.
This replaces the approach of having the repo type know about conversation lifecycle in detail. Instead, the repo type accepts events and applies them to the DB blindly. The SQL written to implement each event does, however, embed assumptions about what order events will happen in.
Diffstat (limited to 'src/conversation/app.rs')
| -rw-r--r-- | src/conversation/app.rs | 69 |
1 files changed, 44 insertions, 25 deletions
diff --git a/src/conversation/app.rs b/src/conversation/app.rs index 81ccdcf..30baf77 100644 --- a/src/conversation/app.rs +++ b/src/conversation/app.rs @@ -3,7 +3,7 @@ use itertools::Itertools; use sqlx::sqlite::SqlitePool; use super::{ - Conversation, Id, + Conversation, History, Id, history, repo::{LoadError, Provider as _}, validate, }; @@ -11,7 +11,7 @@ use crate::{ clock::DateTime, db::{Duplicate as _, NotFound as _}, event::{Broadcaster, Event, Sequence, repo::Provider as _}, - message::{self, repo::Provider as _}, + message::repo::Provider as _, name::{self, Name}, }; @@ -36,15 +36,21 @@ impl<'a> Conversations<'a> { let mut tx = self.db.begin().await?; let created = tx.sequence().next(created_at).await?; - let conversation = tx - .conversations() - .create(name, &created) + let conversation = History::begin(name, created); + + // This filter technically includes every event in the history, but it's easier to follow if + // the various event-manipulating app methods are consistent, and it's harmless to have an + // always-satisfied filter. + let events = conversation.events().filter(Sequence::start_from(created)); + tx.conversations() + .record_events(events.clone()) .await .duplicate(|| CreateError::DuplicateName(name.clone()))?; + tx.commit().await?; self.events - .broadcast(conversation.events().map(Event::from).collect::<Vec<_>>()); + .broadcast(events.map(Event::from).collect::<Vec<_>>()); Ok(conversation.as_created()) } @@ -78,11 +84,6 @@ impl<'a> Conversations<'a> { .by_id(conversation) .await .not_found(|| DeleteError::NotFound(conversation.clone()))?; - conversation - .as_snapshot() - .ok_or_else(|| DeleteError::Deleted(conversation.id().clone()))?; - - let mut events = Vec::new(); let messages = tx.messages().live(&conversation).await?; let has_messages = messages @@ -94,17 +95,15 @@ impl<'a> Conversations<'a> { } let deleted = tx.sequence().next(deleted_at).await?; - let conversation = tx.conversations().delete(&conversation, &deleted).await?; - events.extend( - conversation - .events() - .filter(Sequence::start_from(deleted.sequence)) - .map(Event::from), - ); + let conversation = conversation.delete(deleted)?; + + let events = conversation.events().filter(Sequence::start_from(deleted)); + tx.conversations().record_events(events.clone()).await?; tx.commit().await?; - self.events.broadcast(events); + self.events + .broadcast(events.map(Event::from).collect::<Vec<_>>()); Ok(()) } @@ -120,12 +119,14 @@ impl<'a> Conversations<'a> { let mut events = Vec::with_capacity(expired.len()); for conversation in expired { let deleted = tx.sequence().next(relative_to).await?; - let conversation = tx.conversations().delete(&conversation, &deleted).await?; - events.push( - conversation - .events() - .filter(Sequence::start_from(deleted.sequence)), - ); + let conversation = conversation.delete(deleted)?; + + let conversation_events = conversation.events().filter(Sequence::start_from(deleted)); + tx.conversations() + .record_events(conversation_events.clone()) + .await?; + + events.push(conversation_events); } tx.commit().await?; @@ -218,8 +219,18 @@ impl From<LoadError> for DeleteError { } } +impl From<history::DeleteError> for DeleteError { + fn from(error: history::DeleteError) -> Self { + match error { + history::DeleteError::Deleted(conversation) => Self::Deleted(conversation.id().clone()), + } + } +} + #[derive(Debug, thiserror::Error)] pub enum ExpireError { + #[error("tried to expire already-deleted conversation: {0}")] + Deleted(Id), #[error(transparent)] Database(#[from] sqlx::Error), #[error(transparent)] @@ -234,3 +245,11 @@ impl From<LoadError> for ExpireError { } } } + +impl From<history::DeleteError> for ExpireError { + fn from(error: history::DeleteError) -> Self { + match error { + history::DeleteError::Deleted(conversation) => Self::Deleted(conversation.id().clone()), + } + } +} |
