diff options
Diffstat (limited to 'src/conversation/app.rs')
| -rw-r--r-- | src/conversation/app.rs | 69 |
1 files changed, 44 insertions, 25 deletions
diff --git a/src/conversation/app.rs b/src/conversation/app.rs index 81ccdcf..30baf77 100644 --- a/src/conversation/app.rs +++ b/src/conversation/app.rs @@ -3,7 +3,7 @@ use itertools::Itertools; use sqlx::sqlite::SqlitePool; use super::{ - Conversation, Id, + Conversation, History, Id, history, repo::{LoadError, Provider as _}, validate, }; @@ -11,7 +11,7 @@ use crate::{ clock::DateTime, db::{Duplicate as _, NotFound as _}, event::{Broadcaster, Event, Sequence, repo::Provider as _}, - message::{self, repo::Provider as _}, + message::repo::Provider as _, name::{self, Name}, }; @@ -36,15 +36,21 @@ impl<'a> Conversations<'a> { let mut tx = self.db.begin().await?; let created = tx.sequence().next(created_at).await?; - let conversation = tx - .conversations() - .create(name, &created) + let conversation = History::begin(name, created); + + // This filter technically includes every event in the history, but it's easier to follow if + // the various event-manipulating app methods are consistent, and it's harmless to have an + // always-satisfied filter. + let events = conversation.events().filter(Sequence::start_from(created)); + tx.conversations() + .record_events(events.clone()) .await .duplicate(|| CreateError::DuplicateName(name.clone()))?; + tx.commit().await?; self.events - .broadcast(conversation.events().map(Event::from).collect::<Vec<_>>()); + .broadcast(events.map(Event::from).collect::<Vec<_>>()); Ok(conversation.as_created()) } @@ -78,11 +84,6 @@ impl<'a> Conversations<'a> { .by_id(conversation) .await .not_found(|| DeleteError::NotFound(conversation.clone()))?; - conversation - .as_snapshot() - .ok_or_else(|| DeleteError::Deleted(conversation.id().clone()))?; - - let mut events = Vec::new(); let messages = tx.messages().live(&conversation).await?; let has_messages = messages @@ -94,17 +95,15 @@ impl<'a> Conversations<'a> { } let deleted = tx.sequence().next(deleted_at).await?; - let conversation = tx.conversations().delete(&conversation, &deleted).await?; - events.extend( - conversation - .events() - .filter(Sequence::start_from(deleted.sequence)) - .map(Event::from), - ); + let conversation = conversation.delete(deleted)?; + + let events = conversation.events().filter(Sequence::start_from(deleted)); + tx.conversations().record_events(events.clone()).await?; tx.commit().await?; - self.events.broadcast(events); + self.events + .broadcast(events.map(Event::from).collect::<Vec<_>>()); Ok(()) } @@ -120,12 +119,14 @@ impl<'a> Conversations<'a> { let mut events = Vec::with_capacity(expired.len()); for conversation in expired { let deleted = tx.sequence().next(relative_to).await?; - let conversation = tx.conversations().delete(&conversation, &deleted).await?; - events.push( - conversation - .events() - .filter(Sequence::start_from(deleted.sequence)), - ); + let conversation = conversation.delete(deleted)?; + + let conversation_events = conversation.events().filter(Sequence::start_from(deleted)); + tx.conversations() + .record_events(conversation_events.clone()) + .await?; + + events.push(conversation_events); } tx.commit().await?; @@ -218,8 +219,18 @@ impl From<LoadError> for DeleteError { } } +impl From<history::DeleteError> for DeleteError { + fn from(error: history::DeleteError) -> Self { + match error { + history::DeleteError::Deleted(conversation) => Self::Deleted(conversation.id().clone()), + } + } +} + #[derive(Debug, thiserror::Error)] pub enum ExpireError { + #[error("tried to expire already-deleted conversation: {0}")] + Deleted(Id), #[error(transparent)] Database(#[from] sqlx::Error), #[error(transparent)] @@ -234,3 +245,11 @@ impl From<LoadError> for ExpireError { } } } + +impl From<history::DeleteError> for ExpireError { + fn from(error: history::DeleteError) -> Self { + match error { + history::DeleteError::Deleted(conversation) => Self::Deleted(conversation.id().clone()), + } + } +} |
