summaryrefslogtreecommitdiff
path: root/src/message
diff options
context:
space:
mode:
Diffstat (limited to 'src/message')
-rw-r--r--src/message/app.rs121
-rw-r--r--src/message/handlers/delete/mod.rs11
-rw-r--r--src/message/handlers/delete/test.rs8
-rw-r--r--src/message/history.rs15
-rw-r--r--src/message/repo.rs10
5 files changed, 113 insertions, 52 deletions
diff --git a/src/message/app.rs b/src/message/app.rs
index bdc2164..9100224 100644
--- a/src/message/app.rs
+++ b/src/message/app.rs
@@ -8,8 +8,9 @@ use crate::{
conversation::{self, repo::Provider as _},
db::NotFound as _,
event::{Broadcaster, Event, Sequence, repo::Provider as _},
+ login::Login,
name,
- user::User,
+ user::{self, repo::Provider as _},
};
pub struct Messages<'a> {
@@ -25,27 +26,35 @@ impl<'a> Messages<'a> {
pub async fn send(
&self,
conversation: &conversation::Id,
- sender: &User,
+ sender: &Login,
sent_at: &DateTime,
body: &Body,
) -> Result<Message, SendError> {
- let to_not_found = || SendError::ConversationNotFound(conversation.clone());
- let to_deleted = || SendError::ConversationDeleted(conversation.clone());
+ let conversation_not_found = || SendError::ConversationNotFound(conversation.clone());
+ let conversation_deleted = || SendError::ConversationDeleted(conversation.clone());
+ let sender_not_found = || SendError::SenderNotFound(sender.id.clone().into());
+ let sender_deleted = || SendError::SenderDeleted(sender.id.clone().into());
let mut tx = self.db.begin().await?;
let conversation = tx
.conversations()
.by_id(conversation)
.await
- .not_found(to_not_found)?;
+ .not_found(conversation_not_found)?;
+ let sender = tx
+ .users()
+ .by_login(sender)
+ .await
+ .not_found(sender_not_found)?;
// Ordering: don't bother allocating a sequence number before we know the channel might
// exist.
let sent = tx.sequence().next(sent_at).await?;
- let conversation = conversation.as_of(sent).ok_or_else(to_deleted)?;
+ let conversation = conversation.as_of(sent).ok_or_else(conversation_deleted)?;
+ let sender = sender.as_of(sent).ok_or_else(sender_deleted)?;
let message = tx
.messages()
- .create(&conversation, sender, &sent, body)
+ .create(&conversation, &sender, &sent, body)
.await?;
tx.commit().await?;
@@ -57,36 +66,48 @@ impl<'a> Messages<'a> {
pub async fn delete(
&self,
- deleted_by: &User,
+ deleted_by: &Login,
message: &Id,
deleted_at: &DateTime,
) -> Result<(), DeleteError> {
+ let message_not_found = || DeleteError::MessageNotFound(message.clone());
+ let message_deleted = || DeleteError::Deleted(message.clone());
+ let deleter_not_found = || DeleteError::UserNotFound(deleted_by.id.clone().into());
+ let deleter_deleted = || DeleteError::UserDeleted(deleted_by.id.clone().into());
+ let not_sender = || DeleteError::NotSender(deleted_by.id.clone().into());
+
let mut tx = self.db.begin().await?;
let message = tx
.messages()
.by_id(message)
.await
- .not_found(|| DeleteError::NotFound(message.clone()))?;
- let snapshot = message
- .as_snapshot()
- .ok_or_else(|| DeleteError::Deleted(message.id().clone()))?;
- if snapshot.sender != deleted_by.id {
- return Err(DeleteError::NotSender(deleted_by.clone()));
- }
+ .not_found(message_not_found)?;
+ let deleted_by = tx
+ .users()
+ .by_login(deleted_by)
+ .await
+ .not_found(deleter_not_found)?;
let deleted = tx.sequence().next(deleted_at).await?;
- let message = tx.messages().delete(&message, &deleted).await?;
- tx.commit().await?;
+ let message = message.as_of(deleted).ok_or_else(message_deleted)?;
+ let deleted_by = deleted_by.as_of(deleted).ok_or_else(deleter_deleted)?;
- self.events.broadcast(
- message
- .events()
- .filter(Sequence::start_from(deleted.sequence))
- .map(Event::from)
- .collect::<Vec<_>>(),
- );
+ if message.sender == deleted_by.id {
+ let message = tx.messages().delete(&message, &deleted).await?;
+ tx.commit().await?;
- Ok(())
+ self.events.broadcast(
+ message
+ .events()
+ .filter(Sequence::start_from(deleted.sequence))
+ .map(Event::from)
+ .collect::<Vec<_>>(),
+ );
+
+ Ok(())
+ } else {
+ Err(not_sender())
+ }
}
pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> {
@@ -99,12 +120,14 @@ impl<'a> Messages<'a> {
let mut events = Vec::with_capacity(expired.len());
for message in expired {
let deleted = tx.sequence().next(relative_to).await?;
- let message = tx.messages().delete(&message, &deleted).await?;
- events.push(
- message
- .events()
- .filter(Sequence::start_from(deleted.sequence)),
- );
+ if let Some(message) = message.as_of(deleted) {
+ let message = tx.messages().delete(&message, &deleted).await?;
+ events.push(
+ message
+ .events()
+ .filter(Sequence::start_from(deleted.sequence)),
+ );
+ }
}
tx.commit().await?;
@@ -138,6 +161,10 @@ pub enum SendError {
ConversationNotFound(conversation::Id),
#[error("conversation {0} deleted")]
ConversationDeleted(conversation::Id),
+ #[error("user {0} not found")]
+ SenderNotFound(user::Id),
+ #[error("user {0} deleted")]
+ SenderDeleted(user::Id),
#[error(transparent)]
Database(#[from] sqlx::Error),
#[error(transparent)]
@@ -154,14 +181,40 @@ impl From<conversation::repo::LoadError> for SendError {
}
}
+impl From<user::repo::LoadError> for SendError {
+ fn from(error: user::repo::LoadError) -> Self {
+ use user::repo::LoadError;
+ match error {
+ LoadError::Database(error) => error.into(),
+ LoadError::Name(error) => error.into(),
+ }
+ }
+}
+
#[derive(Debug, thiserror::Error)]
pub enum DeleteError {
#[error("message {0} not found")]
- NotFound(Id),
- #[error("user {} not the message's sender", .0.id)]
- NotSender(User),
+ MessageNotFound(Id),
+ #[error("user {0} not found")]
+ UserNotFound(user::Id),
+ #[error("user {0} deleted")]
+ UserDeleted(user::Id),
+ #[error("user {0} not the message's sender")]
+ NotSender(user::Id),
#[error("message {0} deleted")]
Deleted(Id),
#[error(transparent)]
Database(#[from] sqlx::Error),
+ #[error(transparent)]
+ Name(#[from] name::Error),
+}
+
+impl From<user::repo::LoadError> for DeleteError {
+ fn from(error: user::repo::LoadError) -> Self {
+ use user::repo::LoadError;
+ match error {
+ LoadError::Database(error) => error.into(),
+ LoadError::Name(error) => error.into(),
+ }
+ }
}
diff --git a/src/message/handlers/delete/mod.rs b/src/message/handlers/delete/mod.rs
index 5eac4eb..606f502 100644
--- a/src/message/handlers/delete/mod.rs
+++ b/src/message/handlers/delete/mod.rs
@@ -22,7 +22,7 @@ pub async fn handler(
identity: Identity,
) -> Result<Response, Error> {
app.messages()
- .delete(&identity.user, &message, &deleted_at)
+ .delete(&identity.login, &message, &deleted_at)
.await?;
Ok(Response { id: message })
@@ -48,8 +48,13 @@ impl IntoResponse for Error {
let Self(error) = self;
match error {
DeleteError::NotSender(_) => (StatusCode::FORBIDDEN, error.to_string()).into_response(),
- DeleteError::NotFound(_) | DeleteError::Deleted(_) => NotFound(error).into_response(),
- DeleteError::Database(_) => Internal::from(error).into_response(),
+ DeleteError::MessageNotFound(_) | DeleteError::Deleted(_) => {
+ NotFound(error).into_response()
+ }
+ DeleteError::UserNotFound(_)
+ | DeleteError::UserDeleted(_)
+ | DeleteError::Database(_)
+ | DeleteError::Name(_) => Internal::from(error).into_response(),
}
}
}
diff --git a/src/message/handlers/delete/test.rs b/src/message/handlers/delete/test.rs
index 371c7bf..d0e1794 100644
--- a/src/message/handlers/delete/test.rs
+++ b/src/message/handlers/delete/test.rs
@@ -11,7 +11,7 @@ pub async fn delete_message() {
let sender = fixtures::identity::create(&app, &fixtures::now()).await;
let conversation = fixtures::conversation::create(&app, &fixtures::now()).await;
let message =
- fixtures::message::send(&app, &conversation, &sender.user, &fixtures::now()).await;
+ fixtures::message::send(&app, &conversation, &sender.login, &fixtures::now()).await;
// Send the request
@@ -62,7 +62,7 @@ pub async fn delete_invalid_message_id() {
// Verify the response
- assert!(matches!(error, app::DeleteError::NotFound(id) if id == message));
+ assert!(matches!(error, app::DeleteError::MessageNotFound(id) if id == message));
}
#[tokio::test]
@@ -160,7 +160,7 @@ pub async fn delete_purged() {
// Verify the response
- assert!(matches!(error, app::DeleteError::NotFound(id) if id == message.id));
+ assert!(matches!(error, app::DeleteError::MessageNotFound(id) if id == message.id));
}
#[tokio::test]
@@ -187,6 +187,6 @@ pub async fn delete_not_sender() {
// Verify the response
assert!(
- matches!(error, app::DeleteError::NotSender(error_sender) if deleter.user == error_sender)
+ matches!(error, app::DeleteError::NotSender(error_sender) if deleter.login.id == error_sender)
);
}
diff --git a/src/message/history.rs b/src/message/history.rs
index d4d4500..2abdf2c 100644
--- a/src/message/history.rs
+++ b/src/message/history.rs
@@ -1,7 +1,7 @@
use itertools::Itertools as _;
use super::{
- Id, Message,
+ Message,
event::{Deleted, Event, Sent},
};
use crate::event::Sequence;
@@ -13,10 +13,6 @@ pub struct History {
// State interface
impl History {
- pub fn id(&self) -> &Id {
- &self.message.id
- }
-
// Snapshot of this message as it was when sent. (Note to the future: it's okay
// if this returns a redacted or modified version of the message. If we
// implement message editing by redacting the original body, then this should
@@ -26,6 +22,15 @@ impl History {
self.message.clone()
}
+ pub fn as_of<S>(&self, sequence: S) -> Option<Message>
+ where
+ S: Into<Sequence>,
+ {
+ self.events()
+ .filter(Sequence::up_to(sequence.into()))
+ .collect()
+ }
+
// Snapshot of this message as of all events recorded in this history.
pub fn as_snapshot(&self) -> Option<Message> {
self.events().collect()
diff --git a/src/message/repo.rs b/src/message/repo.rs
index 2e9700a..83bf0d5 100644
--- a/src/message/repo.rs
+++ b/src/message/repo.rs
@@ -180,17 +180,15 @@ impl Messages<'_> {
pub async fn delete(
&mut self,
- message: &History,
+ message: &Message,
deleted: &Instant,
) -> Result<History, sqlx::Error> {
- let id = message.id();
-
sqlx::query!(
r#"
insert into message_deleted (id, deleted_at, deleted_sequence)
values ($1, $2, $3)
"#,
- id,
+ message.id,
deleted.at,
deleted.sequence,
)
@@ -209,12 +207,12 @@ impl Messages<'_> {
returning id as "id: Id"
"#,
deleted.sequence,
- id,
+ message.id,
)
.fetch_one(&mut *self.0)
.await?;
- let message = self.by_id(id).await?;
+ let message = self.by_id(&message.id).await?;
Ok(message)
}