summaryrefslogtreecommitdiff
path: root/src/message/app.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/message/app.rs')
-rw-r--r--src/message/app.rs83
1 files changed, 37 insertions, 46 deletions
diff --git a/src/message/app.rs b/src/message/app.rs
index 9100224..647152e 100644
--- a/src/message/app.rs
+++ b/src/message/app.rs
@@ -2,12 +2,12 @@ use chrono::TimeDelta;
use itertools::Itertools;
use sqlx::sqlite::SqlitePool;
-use super::{Body, Id, Message, repo::Provider as _};
+use super::{Body, History, Id, Message, history, repo::Provider as _};
use crate::{
clock::DateTime,
conversation::{self, repo::Provider as _},
db::NotFound as _,
- event::{Broadcaster, Event, Sequence, repo::Provider as _},
+ event::{Broadcaster, Sequence, repo::Provider as _},
login::Login,
name,
user::{self, repo::Provider as _},
@@ -52,14 +52,17 @@ impl<'a> Messages<'a> {
let sent = tx.sequence().next(sent_at).await?;
let conversation = conversation.as_of(sent).ok_or_else(conversation_deleted)?;
let sender = sender.as_of(sent).ok_or_else(sender_deleted)?;
- let message = tx
- .messages()
- .create(&conversation, &sender, &sent, body)
- .await?;
+ let message = History::begin(&conversation, &sender, body, sent);
+
+ // This filter technically includes every event in the history, but it's easier to follow if
+ // the various event-manipulating app methods are consistent, and it's harmless to have an
+ // always-satisfied filter.
+ let events = message.events().filter(Sequence::start_from(sent));
+ tx.messages().record_events(events.clone()).await?;
+
tx.commit().await?;
- self.events
- .broadcast(message.events().map(Event::from).collect::<Vec<_>>());
+ self.events.broadcast_from(events);
Ok(message.as_sent())
}
@@ -71,38 +74,24 @@ impl<'a> Messages<'a> {
deleted_at: &DateTime,
) -> Result<(), DeleteError> {
let message_not_found = || DeleteError::MessageNotFound(message.clone());
- let message_deleted = || DeleteError::Deleted(message.clone());
- let deleter_not_found = || DeleteError::UserNotFound(deleted_by.id.clone().into());
- let deleter_deleted = || DeleteError::UserDeleted(deleted_by.id.clone().into());
let not_sender = || DeleteError::NotSender(deleted_by.id.clone().into());
let mut tx = self.db.begin().await?;
+
let message = tx
.messages()
.by_id(message)
.await
.not_found(message_not_found)?;
- let deleted_by = tx
- .users()
- .by_login(deleted_by)
- .await
- .not_found(deleter_not_found)?;
-
- let deleted = tx.sequence().next(deleted_at).await?;
- let message = message.as_of(deleted).ok_or_else(message_deleted)?;
- let deleted_by = deleted_by.as_of(deleted).ok_or_else(deleter_deleted)?;
+ if message.sender() == &deleted_by.id {
+ let deleted_at = tx.sequence().next(deleted_at).await?;
+ let message = message.delete(deleted_at)?;
- if message.sender == deleted_by.id {
- let message = tx.messages().delete(&message, &deleted).await?;
+ let events = message.events().filter(Sequence::start_from(deleted_at));
+ tx.messages().record_events(events.clone()).await?;
tx.commit().await?;
- self.events.broadcast(
- message
- .events()
- .filter(Sequence::start_from(deleted.sequence))
- .map(Event::from)
- .collect::<Vec<_>>(),
- );
+ self.events.broadcast_from(events);
Ok(())
} else {
@@ -120,25 +109,23 @@ impl<'a> Messages<'a> {
let mut events = Vec::with_capacity(expired.len());
for message in expired {
let deleted = tx.sequence().next(relative_to).await?;
- if let Some(message) = message.as_of(deleted) {
- let message = tx.messages().delete(&message, &deleted).await?;
- events.push(
- message
+ match message.delete(deleted) {
+ Ok(message) => {
+ let message_events = message
.events()
- .filter(Sequence::start_from(deleted.sequence)),
- );
+ .filter(Sequence::start_from(deleted.sequence));
+ tx.messages().record_events(message_events.clone()).await?;
+
+ events.push(message_events);
+ }
+ Err(history::DeleteError::Deleted(_)) => {}
}
}
tx.commit().await?;
- self.events.broadcast(
- events
- .into_iter()
- .kmerge_by(Sequence::merge)
- .map(Event::from)
- .collect::<Vec<_>>(),
- );
+ self.events
+ .broadcast_from(events.into_iter().kmerge_by(Sequence::merge));
Ok(())
}
@@ -195,10 +182,6 @@ impl From<user::repo::LoadError> for SendError {
pub enum DeleteError {
#[error("message {0} not found")]
MessageNotFound(Id),
- #[error("user {0} not found")]
- UserNotFound(user::Id),
- #[error("user {0} deleted")]
- UserDeleted(user::Id),
#[error("user {0} not the message's sender")]
NotSender(user::Id),
#[error("message {0} deleted")]
@@ -218,3 +201,11 @@ impl From<user::repo::LoadError> for DeleteError {
}
}
}
+
+impl From<history::DeleteError> for DeleteError {
+ fn from(error: history::DeleteError) -> Self {
+ match error {
+ history::DeleteError::Deleted(message) => Self::Deleted(message.id().clone()),
+ }
+ }
+}