summaryrefslogtreecommitdiff
path: root/src/message
diff options
context:
space:
mode:
Diffstat (limited to 'src/message')
-rw-r--r--src/message/app.rs83
-rw-r--r--src/message/handlers/delete/mod.rs7
-rw-r--r--src/message/handlers/delete/test.rs18
-rw-r--r--src/message/history.rs70
-rw-r--r--src/message/repo.rs148
5 files changed, 179 insertions, 147 deletions
diff --git a/src/message/app.rs b/src/message/app.rs
index 9100224..647152e 100644
--- a/src/message/app.rs
+++ b/src/message/app.rs
@@ -2,12 +2,12 @@ use chrono::TimeDelta;
use itertools::Itertools;
use sqlx::sqlite::SqlitePool;
-use super::{Body, Id, Message, repo::Provider as _};
+use super::{Body, History, Id, Message, history, repo::Provider as _};
use crate::{
clock::DateTime,
conversation::{self, repo::Provider as _},
db::NotFound as _,
- event::{Broadcaster, Event, Sequence, repo::Provider as _},
+ event::{Broadcaster, Sequence, repo::Provider as _},
login::Login,
name,
user::{self, repo::Provider as _},
@@ -52,14 +52,17 @@ impl<'a> Messages<'a> {
let sent = tx.sequence().next(sent_at).await?;
let conversation = conversation.as_of(sent).ok_or_else(conversation_deleted)?;
let sender = sender.as_of(sent).ok_or_else(sender_deleted)?;
- let message = tx
- .messages()
- .create(&conversation, &sender, &sent, body)
- .await?;
+ let message = History::begin(&conversation, &sender, body, sent);
+
+ // This filter technically includes every event in the history, but it's easier to follow if
+ // the various event-manipulating app methods are consistent, and it's harmless to have an
+ // always-satisfied filter.
+ let events = message.events().filter(Sequence::start_from(sent));
+ tx.messages().record_events(events.clone()).await?;
+
tx.commit().await?;
- self.events
- .broadcast(message.events().map(Event::from).collect::<Vec<_>>());
+ self.events.broadcast_from(events);
Ok(message.as_sent())
}
@@ -71,38 +74,24 @@ impl<'a> Messages<'a> {
deleted_at: &DateTime,
) -> Result<(), DeleteError> {
let message_not_found = || DeleteError::MessageNotFound(message.clone());
- let message_deleted = || DeleteError::Deleted(message.clone());
- let deleter_not_found = || DeleteError::UserNotFound(deleted_by.id.clone().into());
- let deleter_deleted = || DeleteError::UserDeleted(deleted_by.id.clone().into());
let not_sender = || DeleteError::NotSender(deleted_by.id.clone().into());
let mut tx = self.db.begin().await?;
+
let message = tx
.messages()
.by_id(message)
.await
.not_found(message_not_found)?;
- let deleted_by = tx
- .users()
- .by_login(deleted_by)
- .await
- .not_found(deleter_not_found)?;
-
- let deleted = tx.sequence().next(deleted_at).await?;
- let message = message.as_of(deleted).ok_or_else(message_deleted)?;
- let deleted_by = deleted_by.as_of(deleted).ok_or_else(deleter_deleted)?;
+ if message.sender() == &deleted_by.id {
+ let deleted_at = tx.sequence().next(deleted_at).await?;
+ let message = message.delete(deleted_at)?;
- if message.sender == deleted_by.id {
- let message = tx.messages().delete(&message, &deleted).await?;
+ let events = message.events().filter(Sequence::start_from(deleted_at));
+ tx.messages().record_events(events.clone()).await?;
tx.commit().await?;
- self.events.broadcast(
- message
- .events()
- .filter(Sequence::start_from(deleted.sequence))
- .map(Event::from)
- .collect::<Vec<_>>(),
- );
+ self.events.broadcast_from(events);
Ok(())
} else {
@@ -120,25 +109,23 @@ impl<'a> Messages<'a> {
let mut events = Vec::with_capacity(expired.len());
for message in expired {
let deleted = tx.sequence().next(relative_to).await?;
- if let Some(message) = message.as_of(deleted) {
- let message = tx.messages().delete(&message, &deleted).await?;
- events.push(
- message
+ match message.delete(deleted) {
+ Ok(message) => {
+ let message_events = message
.events()
- .filter(Sequence::start_from(deleted.sequence)),
- );
+ .filter(Sequence::start_from(deleted.sequence));
+ tx.messages().record_events(message_events.clone()).await?;
+
+ events.push(message_events);
+ }
+ Err(history::DeleteError::Deleted(_)) => {}
}
}
tx.commit().await?;
- self.events.broadcast(
- events
- .into_iter()
- .kmerge_by(Sequence::merge)
- .map(Event::from)
- .collect::<Vec<_>>(),
- );
+ self.events
+ .broadcast_from(events.into_iter().kmerge_by(Sequence::merge));
Ok(())
}
@@ -195,10 +182,6 @@ impl From<user::repo::LoadError> for SendError {
pub enum DeleteError {
#[error("message {0} not found")]
MessageNotFound(Id),
- #[error("user {0} not found")]
- UserNotFound(user::Id),
- #[error("user {0} deleted")]
- UserDeleted(user::Id),
#[error("user {0} not the message's sender")]
NotSender(user::Id),
#[error("message {0} deleted")]
@@ -218,3 +201,11 @@ impl From<user::repo::LoadError> for DeleteError {
}
}
}
+
+impl From<history::DeleteError> for DeleteError {
+ fn from(error: history::DeleteError) -> Self {
+ match error {
+ history::DeleteError::Deleted(message) => Self::Deleted(message.id().clone()),
+ }
+ }
+}
diff --git a/src/message/handlers/delete/mod.rs b/src/message/handlers/delete/mod.rs
index 606f502..3e9a212 100644
--- a/src/message/handlers/delete/mod.rs
+++ b/src/message/handlers/delete/mod.rs
@@ -51,10 +51,9 @@ impl IntoResponse for Error {
DeleteError::MessageNotFound(_) | DeleteError::Deleted(_) => {
NotFound(error).into_response()
}
- DeleteError::UserNotFound(_)
- | DeleteError::UserDeleted(_)
- | DeleteError::Database(_)
- | DeleteError::Name(_) => Internal::from(error).into_response(),
+ DeleteError::Database(_) | DeleteError::Name(_) => {
+ Internal::from(error).into_response()
+ }
}
}
}
diff --git a/src/message/handlers/delete/test.rs b/src/message/handlers/delete/test.rs
index d0e1794..05d9344 100644
--- a/src/message/handlers/delete/test.rs
+++ b/src/message/handlers/delete/test.rs
@@ -70,23 +70,23 @@ pub async fn delete_deleted() {
// Set up the environment
let app = fixtures::scratch_app().await;
- let sender = fixtures::user::create(&app, &fixtures::now()).await;
+ let sender = fixtures::identity::create(&app, &fixtures::now()).await;
let conversation = fixtures::conversation::create(&app, &fixtures::now()).await;
- let message = fixtures::message::send(&app, &conversation, &sender, &fixtures::now()).await;
+ let message =
+ fixtures::message::send(&app, &conversation, &sender.login, &fixtures::now()).await;
app.messages()
- .delete(&sender, &message.id, &fixtures::now())
+ .delete(&sender.login, &message.id, &fixtures::now())
.await
.expect("deleting a recently-sent message succeeds");
// Send the request
- let deleter = fixtures::identity::create(&app, &fixtures::now()).await;
let super::Error(error) = super::handler(
State(app.clone()),
Path(message.id.clone()),
fixtures::now(),
- deleter,
+ sender,
)
.await
.expect_err("deleting a deleted message fails");
@@ -101,9 +101,10 @@ pub async fn delete_expired() {
// Set up the environment
let app = fixtures::scratch_app().await;
- let sender = fixtures::user::create(&app, &fixtures::ancient()).await;
+ let sender = fixtures::identity::create(&app, &fixtures::ancient()).await;
let conversation = fixtures::conversation::create(&app, &fixtures::ancient()).await;
- let message = fixtures::message::send(&app, &conversation, &sender, &fixtures::ancient()).await;
+ let message =
+ fixtures::message::send(&app, &conversation, &sender.login, &fixtures::ancient()).await;
app.messages()
.expire(&fixtures::now())
@@ -112,12 +113,11 @@ pub async fn delete_expired() {
// Send the request
- let deleter = fixtures::identity::create(&app, &fixtures::now()).await;
let super::Error(error) = super::handler(
State(app.clone()),
Path(message.id.clone()),
fixtures::now(),
- deleter,
+ sender,
)
.await
.expect_err("deleting an expired message fails");
diff --git a/src/message/history.rs b/src/message/history.rs
index 2abdf2c..92cecc9 100644
--- a/src/message/history.rs
+++ b/src/message/history.rs
@@ -1,18 +1,67 @@
use itertools::Itertools as _;
use super::{
- Message,
+ Body, Id, Message,
event::{Deleted, Event, Sent},
};
-use crate::event::Sequence;
+use crate::{
+ conversation::Conversation,
+ event::{Instant, Sequence},
+ user::{self, User},
+};
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct History {
pub message: Message,
}
+// Lifecycle interface
+impl History {
+ pub fn begin(conversation: &Conversation, sender: &User, body: &Body, sent: Instant) -> Self {
+ Self {
+ message: Message {
+ id: Id::generate(),
+ conversation: conversation.id.clone(),
+ sender: sender.id.clone(),
+ body: body.clone(),
+ sent,
+ deleted: None,
+ },
+ }
+ }
+
+ pub fn delete(self, deleted: Instant) -> Result<Self, DeleteError> {
+ if self.message.deleted.is_none() {
+ Ok(Self {
+ message: Message {
+ deleted: Some(deleted),
+ ..self.message
+ },
+ })
+ } else {
+ Err(DeleteError::Deleted(self.into()))
+ }
+ }
+}
+
+#[derive(Debug, thiserror::Error)]
+pub enum DeleteError {
+ #[error("message {} already deleted", .0.message.id)]
+ // Payload is boxed here to avoid copying an entire `History` around in any errors this error
+ // gets chained into. See <https://rust-lang.github.io/rust-clippy/master/index.html#result_large_err>.
+ Deleted(Box<History>),
+}
+
// State interface
impl History {
+ pub fn id(&self) -> &Id {
+ &self.message.id
+ }
+
+ pub fn sender(&self) -> &user::Id {
+ &self.message.sender
+ }
+
// Snapshot of this message as it was when sent. (Note to the future: it's okay
// if this returns a redacted or modified version of the message. If we
// implement message editing by redacting the original body, then this should
@@ -30,15 +79,16 @@ impl History {
.filter(Sequence::up_to(sequence.into()))
.collect()
}
-
- // Snapshot of this message as of all events recorded in this history.
- pub fn as_snapshot(&self) -> Option<Message> {
- self.events().collect()
- }
}
// Events interface
impl History {
+ pub fn events(&self) -> impl Iterator<Item = Event> + Clone + use<> {
+ [self.sent()]
+ .into_iter()
+ .merge_by(self.deleted(), Sequence::merge)
+ }
+
fn sent(&self) -> Event {
Sent {
message: self.message.clone(),
@@ -55,10 +105,4 @@ impl History {
.into()
})
}
-
- pub fn events(&self) -> impl Iterator<Item = Event> + use<> {
- [self.sent()]
- .into_iter()
- .merge_by(self.deleted(), Sequence::merge)
- }
}
diff --git a/src/message/repo.rs b/src/message/repo.rs
index 83bf0d5..4f66bdc 100644
--- a/src/message/repo.rs
+++ b/src/message/repo.rs
@@ -1,11 +1,14 @@
use sqlx::{SqliteConnection, Transaction, sqlite::Sqlite};
-use super::{Body, History, Id, snapshot::Message};
+use super::{
+ Body, Event, History, Id, Message,
+ event::{Deleted, Sent},
+};
use crate::{
clock::DateTime,
- conversation::{self, Conversation},
+ conversation,
event::{Instant, Sequence},
- user::{self, User},
+ user,
};
pub trait Provider {
@@ -21,50 +24,84 @@ impl Provider for Transaction<'_, Sqlite> {
pub struct Messages<'t>(&'t mut SqliteConnection);
impl Messages<'_> {
- pub async fn create(
+ pub async fn record_events(
&mut self,
- conversation: &Conversation,
- sender: &User,
- sent: &Instant,
- body: &Body,
- ) -> Result<History, sqlx::Error> {
- let id = Id::generate();
+ events: impl IntoIterator<Item = Event>,
+ ) -> Result<(), sqlx::Error> {
+ for event in events {
+ self.record_event(&event).await?;
+ }
+ Ok(())
+ }
- let message = sqlx::query!(
+ pub async fn record_event(&mut self, event: &Event) -> Result<(), sqlx::Error> {
+ match event {
+ Event::Sent(sent) => self.record_sent(sent).await,
+ Event::Deleted(deleted) => self.record_deleted(deleted).await,
+ }
+ }
+
+ async fn record_sent(&mut self, sent: &Sent) -> Result<(), sqlx::Error> {
+ let Message {
+ id,
+ conversation,
+ sender,
+ body,
+ sent,
+ deleted: _,
+ } = &sent.message;
+
+ sqlx::query!(
r#"
insert into message
- (id, conversation, sender, sent_at, sent_sequence, body, last_sequence)
- values ($1, $2, $3, $4, $5, $6, $7)
- returning
- id as "id: Id",
- conversation as "conversation: conversation::Id",
- sender as "sender: user::Id",
- sent_at as "sent_at: DateTime",
- sent_sequence as "sent_sequence: Sequence",
- body as "body: Body"
+ (id, conversation, sender, body, sent_at, sent_sequence, last_sequence)
+ values ($1, $2, $3, $4, $5, $6, $6)
"#,
id,
- conversation.id,
- sender.id,
- sent.at,
- sent.sequence,
+ conversation,
+ sender,
body,
+ sent.at,
sent.sequence,
)
- .map(|row| History {
- message: Message {
- sent: Instant::new(row.sent_at, row.sent_sequence),
- conversation: row.conversation,
- sender: row.sender,
- id: row.id,
- body: row.body.unwrap_or_default(),
- deleted: None,
- },
- })
- .fetch_one(&mut *self.0)
+ .execute(&mut *self.0)
.await?;
- Ok(message)
+ Ok(())
+ }
+
+ async fn record_deleted(&mut self, deleted: &Deleted) -> Result<(), sqlx::Error> {
+ let Deleted { instant, id } = deleted;
+
+ sqlx::query!(
+ r#"
+ insert into message_deleted (id, deleted_at, deleted_sequence)
+ values ($1, $2, $3)
+ "#,
+ id,
+ instant.at,
+ instant.sequence,
+ )
+ .execute(&mut *self.0)
+ .await?;
+
+ // Small social responsibility hack here: when a message is deleted, its body is
+ // retconned to have been the empty string. Someone reading the event stream
+ // afterwards, or looking at messages in the conversation, cannot retrieve the
+ // "deleted" message by ignoring the deletion event.
+ sqlx::query!(
+ r#"
+ update message
+ set body = '', last_sequence = max(last_sequence, $1)
+ where id = $2
+ "#,
+ instant.sequence,
+ id,
+ )
+ .execute(&mut *self.0)
+ .await?;
+
+ Ok(())
}
pub async fn live(
@@ -178,45 +215,6 @@ impl Messages<'_> {
Ok(message)
}
- pub async fn delete(
- &mut self,
- message: &Message,
- deleted: &Instant,
- ) -> Result<History, sqlx::Error> {
- sqlx::query!(
- r#"
- insert into message_deleted (id, deleted_at, deleted_sequence)
- values ($1, $2, $3)
- "#,
- message.id,
- deleted.at,
- deleted.sequence,
- )
- .execute(&mut *self.0)
- .await?;
-
- // Small social responsibility hack here: when a message is deleted, its body is
- // retconned to have been the empty string. Someone reading the event stream
- // afterwards, or looking at messages in the conversation, cannot retrieve the
- // "deleted" message by ignoring the deletion event.
- sqlx::query!(
- r#"
- update message
- set body = '', last_sequence = max(last_sequence, $1)
- where id = $2
- returning id as "id: Id"
- "#,
- deleted.sequence,
- message.id,
- )
- .fetch_one(&mut *self.0)
- .await?;
-
- let message = self.by_id(&message.id).await?;
-
- Ok(message)
- }
-
pub async fn purge(&mut self, purge_at: &DateTime) -> Result<(), sqlx::Error> {
let messages = sqlx::query_scalar!(
r#"