summaryrefslogtreecommitdiff
path: root/src/conversation/app.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/conversation/app.rs')
-rw-r--r--src/conversation/app.rs69
1 files changed, 44 insertions, 25 deletions
diff --git a/src/conversation/app.rs b/src/conversation/app.rs
index 81ccdcf..30baf77 100644
--- a/src/conversation/app.rs
+++ b/src/conversation/app.rs
@@ -3,7 +3,7 @@ use itertools::Itertools;
use sqlx::sqlite::SqlitePool;
use super::{
- Conversation, Id,
+ Conversation, History, Id, history,
repo::{LoadError, Provider as _},
validate,
};
@@ -11,7 +11,7 @@ use crate::{
clock::DateTime,
db::{Duplicate as _, NotFound as _},
event::{Broadcaster, Event, Sequence, repo::Provider as _},
- message::{self, repo::Provider as _},
+ message::repo::Provider as _,
name::{self, Name},
};
@@ -36,15 +36,21 @@ impl<'a> Conversations<'a> {
let mut tx = self.db.begin().await?;
let created = tx.sequence().next(created_at).await?;
- let conversation = tx
- .conversations()
- .create(name, &created)
+ let conversation = History::begin(name, created);
+
+ // This filter technically includes every event in the history, but it's easier to follow if
+ // the various event-manipulating app methods are consistent, and it's harmless to have an
+ // always-satisfied filter.
+ let events = conversation.events().filter(Sequence::start_from(created));
+ tx.conversations()
+ .record_events(events.clone())
.await
.duplicate(|| CreateError::DuplicateName(name.clone()))?;
+
tx.commit().await?;
self.events
- .broadcast(conversation.events().map(Event::from).collect::<Vec<_>>());
+ .broadcast(events.map(Event::from).collect::<Vec<_>>());
Ok(conversation.as_created())
}
@@ -78,11 +84,6 @@ impl<'a> Conversations<'a> {
.by_id(conversation)
.await
.not_found(|| DeleteError::NotFound(conversation.clone()))?;
- conversation
- .as_snapshot()
- .ok_or_else(|| DeleteError::Deleted(conversation.id().clone()))?;
-
- let mut events = Vec::new();
let messages = tx.messages().live(&conversation).await?;
let has_messages = messages
@@ -94,17 +95,15 @@ impl<'a> Conversations<'a> {
}
let deleted = tx.sequence().next(deleted_at).await?;
- let conversation = tx.conversations().delete(&conversation, &deleted).await?;
- events.extend(
- conversation
- .events()
- .filter(Sequence::start_from(deleted.sequence))
- .map(Event::from),
- );
+ let conversation = conversation.delete(deleted)?;
+
+ let events = conversation.events().filter(Sequence::start_from(deleted));
+ tx.conversations().record_events(events.clone()).await?;
tx.commit().await?;
- self.events.broadcast(events);
+ self.events
+ .broadcast(events.map(Event::from).collect::<Vec<_>>());
Ok(())
}
@@ -120,12 +119,14 @@ impl<'a> Conversations<'a> {
let mut events = Vec::with_capacity(expired.len());
for conversation in expired {
let deleted = tx.sequence().next(relative_to).await?;
- let conversation = tx.conversations().delete(&conversation, &deleted).await?;
- events.push(
- conversation
- .events()
- .filter(Sequence::start_from(deleted.sequence)),
- );
+ let conversation = conversation.delete(deleted)?;
+
+ let conversation_events = conversation.events().filter(Sequence::start_from(deleted));
+ tx.conversations()
+ .record_events(conversation_events.clone())
+ .await?;
+
+ events.push(conversation_events);
}
tx.commit().await?;
@@ -218,8 +219,18 @@ impl From<LoadError> for DeleteError {
}
}
+impl From<history::DeleteError> for DeleteError {
+ fn from(error: history::DeleteError) -> Self {
+ match error {
+ history::DeleteError::Deleted(conversation) => Self::Deleted(conversation.id().clone()),
+ }
+ }
+}
+
#[derive(Debug, thiserror::Error)]
pub enum ExpireError {
+ #[error("tried to expire already-deleted conversation: {0}")]
+ Deleted(Id),
#[error(transparent)]
Database(#[from] sqlx::Error),
#[error(transparent)]
@@ -234,3 +245,11 @@ impl From<LoadError> for ExpireError {
}
}
}
+
+impl From<history::DeleteError> for ExpireError {
+ fn from(error: history::DeleteError) -> Self {
+ match error {
+ history::DeleteError::Deleted(conversation) => Self::Deleted(conversation.id().clone()),
+ }
+ }
+}