summaryrefslogtreecommitdiff
path: root/src/conversation/app.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/conversation/app.rs')
-rw-r--r--src/conversation/app.rs87
1 files changed, 51 insertions, 36 deletions
diff --git a/src/conversation/app.rs b/src/conversation/app.rs
index 81ccdcf..26886af 100644
--- a/src/conversation/app.rs
+++ b/src/conversation/app.rs
@@ -3,15 +3,15 @@ use itertools::Itertools;
use sqlx::sqlite::SqlitePool;
use super::{
- Conversation, Id,
+ Conversation, History, Id, history,
repo::{LoadError, Provider as _},
validate,
};
use crate::{
clock::DateTime,
db::{Duplicate as _, NotFound as _},
- event::{Broadcaster, Event, Sequence, repo::Provider as _},
- message::{self, repo::Provider as _},
+ event::{Broadcaster, Sequence, repo::Provider as _},
+ message::repo::Provider as _,
name::{self, Name},
};
@@ -36,15 +36,20 @@ impl<'a> Conversations<'a> {
let mut tx = self.db.begin().await?;
let created = tx.sequence().next(created_at).await?;
- let conversation = tx
- .conversations()
- .create(name, &created)
+ let conversation = History::begin(name, created);
+
+ // This filter technically includes every event in the history, but it's easier to follow if
+ // the various event-manipulating app methods are consistent, and it's harmless to have an
+ // always-satisfied filter.
+ let events = conversation.events().filter(Sequence::start_from(created));
+ tx.conversations()
+ .record_events(events.clone())
.await
.duplicate(|| CreateError::DuplicateName(name.clone()))?;
+
tx.commit().await?;
- self.events
- .broadcast(conversation.events().map(Event::from).collect::<Vec<_>>());
+ self.events.broadcast_from(events);
Ok(conversation.as_created())
}
@@ -78,33 +83,28 @@ impl<'a> Conversations<'a> {
.by_id(conversation)
.await
.not_found(|| DeleteError::NotFound(conversation.clone()))?;
- conversation
- .as_snapshot()
- .ok_or_else(|| DeleteError::Deleted(conversation.id().clone()))?;
-
- let mut events = Vec::new();
let messages = tx.messages().live(&conversation).await?;
+ let deleted_at = tx.sequence().next(deleted_at).await?;
+
let has_messages = messages
.iter()
- .map(message::History::as_snapshot)
+ .map(|message| message.as_of(deleted_at))
.any(|message| message.is_some());
if has_messages {
return Err(DeleteError::NotEmpty(conversation.id().clone()));
}
- let deleted = tx.sequence().next(deleted_at).await?;
- let conversation = tx.conversations().delete(&conversation, &deleted).await?;
- events.extend(
- conversation
- .events()
- .filter(Sequence::start_from(deleted.sequence))
- .map(Event::from),
- );
+ let conversation = conversation.delete(deleted_at)?;
+
+ let events = conversation
+ .events()
+ .filter(Sequence::start_from(deleted_at));
+ tx.conversations().record_events(events.clone()).await?;
tx.commit().await?;
- self.events.broadcast(events);
+ self.events.broadcast_from(events);
Ok(())
}
@@ -120,23 +120,20 @@ impl<'a> Conversations<'a> {
let mut events = Vec::with_capacity(expired.len());
for conversation in expired {
let deleted = tx.sequence().next(relative_to).await?;
- let conversation = tx.conversations().delete(&conversation, &deleted).await?;
- events.push(
- conversation
- .events()
- .filter(Sequence::start_from(deleted.sequence)),
- );
+ let conversation = conversation.delete(deleted)?;
+
+ let conversation_events = conversation.events().filter(Sequence::start_from(deleted));
+ tx.conversations()
+ .record_events(conversation_events.clone())
+ .await?;
+
+ events.push(conversation_events);
}
tx.commit().await?;
- self.events.broadcast(
- events
- .into_iter()
- .kmerge_by(Sequence::merge)
- .map(Event::from)
- .collect::<Vec<_>>(),
- );
+ self.events
+ .broadcast_from(events.into_iter().kmerge_by(Sequence::merge));
Ok(())
}
@@ -218,8 +215,18 @@ impl From<LoadError> for DeleteError {
}
}
+impl From<history::DeleteError> for DeleteError {
+ fn from(error: history::DeleteError) -> Self {
+ match error {
+ history::DeleteError::Deleted(conversation) => Self::Deleted(conversation.id().clone()),
+ }
+ }
+}
+
#[derive(Debug, thiserror::Error)]
pub enum ExpireError {
+ #[error("tried to expire already-deleted conversation: {0}")]
+ Deleted(Id),
#[error(transparent)]
Database(#[from] sqlx::Error),
#[error(transparent)]
@@ -234,3 +241,11 @@ impl From<LoadError> for ExpireError {
}
}
}
+
+impl From<history::DeleteError> for ExpireError {
+ fn from(error: history::DeleteError) -> Self {
+ match error {
+ history::DeleteError::Deleted(conversation) => Self::Deleted(conversation.id().clone()),
+ }
+ }
+}