summaryrefslogtreecommitdiff
path: root/src/message/app.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/message/app.rs')
-rw-r--r--src/message/app.rs121
1 files changed, 87 insertions, 34 deletions
diff --git a/src/message/app.rs b/src/message/app.rs
index bdc2164..9100224 100644
--- a/src/message/app.rs
+++ b/src/message/app.rs
@@ -8,8 +8,9 @@ use crate::{
conversation::{self, repo::Provider as _},
db::NotFound as _,
event::{Broadcaster, Event, Sequence, repo::Provider as _},
+ login::Login,
name,
- user::User,
+ user::{self, repo::Provider as _},
};
pub struct Messages<'a> {
@@ -25,27 +26,35 @@ impl<'a> Messages<'a> {
pub async fn send(
&self,
conversation: &conversation::Id,
- sender: &User,
+ sender: &Login,
sent_at: &DateTime,
body: &Body,
) -> Result<Message, SendError> {
- let to_not_found = || SendError::ConversationNotFound(conversation.clone());
- let to_deleted = || SendError::ConversationDeleted(conversation.clone());
+ let conversation_not_found = || SendError::ConversationNotFound(conversation.clone());
+ let conversation_deleted = || SendError::ConversationDeleted(conversation.clone());
+ let sender_not_found = || SendError::SenderNotFound(sender.id.clone().into());
+ let sender_deleted = || SendError::SenderDeleted(sender.id.clone().into());
let mut tx = self.db.begin().await?;
let conversation = tx
.conversations()
.by_id(conversation)
.await
- .not_found(to_not_found)?;
+ .not_found(conversation_not_found)?;
+ let sender = tx
+ .users()
+ .by_login(sender)
+ .await
+ .not_found(sender_not_found)?;
// Ordering: don't bother allocating a sequence number before we know the channel might
// exist.
let sent = tx.sequence().next(sent_at).await?;
- let conversation = conversation.as_of(sent).ok_or_else(to_deleted)?;
+ let conversation = conversation.as_of(sent).ok_or_else(conversation_deleted)?;
+ let sender = sender.as_of(sent).ok_or_else(sender_deleted)?;
let message = tx
.messages()
- .create(&conversation, sender, &sent, body)
+ .create(&conversation, &sender, &sent, body)
.await?;
tx.commit().await?;
@@ -57,36 +66,48 @@ impl<'a> Messages<'a> {
pub async fn delete(
&self,
- deleted_by: &User,
+ deleted_by: &Login,
message: &Id,
deleted_at: &DateTime,
) -> Result<(), DeleteError> {
+ let message_not_found = || DeleteError::MessageNotFound(message.clone());
+ let message_deleted = || DeleteError::Deleted(message.clone());
+ let deleter_not_found = || DeleteError::UserNotFound(deleted_by.id.clone().into());
+ let deleter_deleted = || DeleteError::UserDeleted(deleted_by.id.clone().into());
+ let not_sender = || DeleteError::NotSender(deleted_by.id.clone().into());
+
let mut tx = self.db.begin().await?;
let message = tx
.messages()
.by_id(message)
.await
- .not_found(|| DeleteError::NotFound(message.clone()))?;
- let snapshot = message
- .as_snapshot()
- .ok_or_else(|| DeleteError::Deleted(message.id().clone()))?;
- if snapshot.sender != deleted_by.id {
- return Err(DeleteError::NotSender(deleted_by.clone()));
- }
+ .not_found(message_not_found)?;
+ let deleted_by = tx
+ .users()
+ .by_login(deleted_by)
+ .await
+ .not_found(deleter_not_found)?;
let deleted = tx.sequence().next(deleted_at).await?;
- let message = tx.messages().delete(&message, &deleted).await?;
- tx.commit().await?;
+ let message = message.as_of(deleted).ok_or_else(message_deleted)?;
+ let deleted_by = deleted_by.as_of(deleted).ok_or_else(deleter_deleted)?;
- self.events.broadcast(
- message
- .events()
- .filter(Sequence::start_from(deleted.sequence))
- .map(Event::from)
- .collect::<Vec<_>>(),
- );
+ if message.sender == deleted_by.id {
+ let message = tx.messages().delete(&message, &deleted).await?;
+ tx.commit().await?;
- Ok(())
+ self.events.broadcast(
+ message
+ .events()
+ .filter(Sequence::start_from(deleted.sequence))
+ .map(Event::from)
+ .collect::<Vec<_>>(),
+ );
+
+ Ok(())
+ } else {
+ Err(not_sender())
+ }
}
pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> {
@@ -99,12 +120,14 @@ impl<'a> Messages<'a> {
let mut events = Vec::with_capacity(expired.len());
for message in expired {
let deleted = tx.sequence().next(relative_to).await?;
- let message = tx.messages().delete(&message, &deleted).await?;
- events.push(
- message
- .events()
- .filter(Sequence::start_from(deleted.sequence)),
- );
+ if let Some(message) = message.as_of(deleted) {
+ let message = tx.messages().delete(&message, &deleted).await?;
+ events.push(
+ message
+ .events()
+ .filter(Sequence::start_from(deleted.sequence)),
+ );
+ }
}
tx.commit().await?;
@@ -138,6 +161,10 @@ pub enum SendError {
ConversationNotFound(conversation::Id),
#[error("conversation {0} deleted")]
ConversationDeleted(conversation::Id),
+ #[error("user {0} not found")]
+ SenderNotFound(user::Id),
+ #[error("user {0} deleted")]
+ SenderDeleted(user::Id),
#[error(transparent)]
Database(#[from] sqlx::Error),
#[error(transparent)]
@@ -154,14 +181,40 @@ impl From<conversation::repo::LoadError> for SendError {
}
}
+impl From<user::repo::LoadError> for SendError {
+ fn from(error: user::repo::LoadError) -> Self {
+ use user::repo::LoadError;
+ match error {
+ LoadError::Database(error) => error.into(),
+ LoadError::Name(error) => error.into(),
+ }
+ }
+}
+
#[derive(Debug, thiserror::Error)]
pub enum DeleteError {
#[error("message {0} not found")]
- NotFound(Id),
- #[error("user {} not the message's sender", .0.id)]
- NotSender(User),
+ MessageNotFound(Id),
+ #[error("user {0} not found")]
+ UserNotFound(user::Id),
+ #[error("user {0} deleted")]
+ UserDeleted(user::Id),
+ #[error("user {0} not the message's sender")]
+ NotSender(user::Id),
#[error("message {0} deleted")]
Deleted(Id),
#[error(transparent)]
Database(#[from] sqlx::Error),
+ #[error(transparent)]
+ Name(#[from] name::Error),
+}
+
+impl From<user::repo::LoadError> for DeleteError {
+ fn from(error: user::repo::LoadError) -> Self {
+ use user::repo::LoadError;
+ match error {
+ LoadError::Database(error) => error.into(),
+ LoadError::Name(error) => error.into(),
+ }
+ }
}