diff options
Diffstat (limited to 'src/message')
| -rw-r--r-- | src/message/app.rs | 72 | ||||
| -rw-r--r-- | src/message/handlers/delete/mod.rs | 7 | ||||
| -rw-r--r-- | src/message/handlers/delete/test.rs | 18 | ||||
| -rw-r--r-- | src/message/history.rs | 70 | ||||
| -rw-r--r-- | src/message/repo.rs | 148 |
5 files changed, 177 insertions, 138 deletions
diff --git a/src/message/app.rs b/src/message/app.rs index 9100224..f0a62d0 100644 --- a/src/message/app.rs +++ b/src/message/app.rs @@ -2,7 +2,7 @@ use chrono::TimeDelta; use itertools::Itertools; use sqlx::sqlite::SqlitePool; -use super::{Body, Id, Message, repo::Provider as _}; +use super::{Body, History, Id, Message, history, repo::Provider as _}; use crate::{ clock::DateTime, conversation::{self, repo::Provider as _}, @@ -52,14 +52,18 @@ impl<'a> Messages<'a> { let sent = tx.sequence().next(sent_at).await?; let conversation = conversation.as_of(sent).ok_or_else(conversation_deleted)?; let sender = sender.as_of(sent).ok_or_else(sender_deleted)?; - let message = tx - .messages() - .create(&conversation, &sender, &sent, body) - .await?; + let message = History::begin(&conversation, &sender, body, sent); + + // This filter technically includes every event in the history, but it's easier to follow if + // the various event-manipulating app methods are consistent, and it's harmless to have an + // always-satisfied filter. + let events = message.events().filter(Sequence::start_from(sent)); + tx.messages().record_events(events.clone()).await?; + tx.commit().await?; self.events - .broadcast(message.events().map(Event::from).collect::<Vec<_>>()); + .broadcast(events.map(Event::from).collect::<Vec<_>>()); Ok(message.as_sent()) } @@ -71,38 +75,25 @@ impl<'a> Messages<'a> { deleted_at: &DateTime, ) -> Result<(), DeleteError> { let message_not_found = || DeleteError::MessageNotFound(message.clone()); - let message_deleted = || DeleteError::Deleted(message.clone()); - let deleter_not_found = || DeleteError::UserNotFound(deleted_by.id.clone().into()); - let deleter_deleted = || DeleteError::UserDeleted(deleted_by.id.clone().into()); let not_sender = || DeleteError::NotSender(deleted_by.id.clone().into()); let mut tx = self.db.begin().await?; + let message = tx .messages() .by_id(message) .await .not_found(message_not_found)?; - let deleted_by = tx - .users() - .by_login(deleted_by) - .await - .not_found(deleter_not_found)?; - - let deleted = tx.sequence().next(deleted_at).await?; - let message = message.as_of(deleted).ok_or_else(message_deleted)?; - let deleted_by = deleted_by.as_of(deleted).ok_or_else(deleter_deleted)?; + if message.sender() == &deleted_by.id { + let deleted_at = tx.sequence().next(deleted_at).await?; + let message = message.delete(deleted_at)?; - if message.sender == deleted_by.id { - let message = tx.messages().delete(&message, &deleted).await?; + let events = message.events().filter(Sequence::start_from(deleted_at)); + tx.messages().record_events(events.clone()).await?; tx.commit().await?; - self.events.broadcast( - message - .events() - .filter(Sequence::start_from(deleted.sequence)) - .map(Event::from) - .collect::<Vec<_>>(), - ); + self.events + .broadcast(events.map(Event::from).collect::<Vec<_>>()); Ok(()) } else { @@ -120,13 +111,16 @@ impl<'a> Messages<'a> { let mut events = Vec::with_capacity(expired.len()); for message in expired { let deleted = tx.sequence().next(relative_to).await?; - if let Some(message) = message.as_of(deleted) { - let message = tx.messages().delete(&message, &deleted).await?; - events.push( - message + match message.delete(deleted) { + Ok(message) => { + let message_events = message .events() - .filter(Sequence::start_from(deleted.sequence)), - ); + .filter(Sequence::start_from(deleted.sequence)); + tx.messages().record_events(message_events.clone()).await?; + + events.push(message_events); + } + Err(history::DeleteError::Deleted(_)) => {} } } @@ -195,10 +189,6 @@ impl From<user::repo::LoadError> for SendError { pub enum DeleteError { #[error("message {0} not found")] MessageNotFound(Id), - #[error("user {0} not found")] - UserNotFound(user::Id), - #[error("user {0} deleted")] - UserDeleted(user::Id), #[error("user {0} not the message's sender")] NotSender(user::Id), #[error("message {0} deleted")] @@ -218,3 +208,11 @@ impl From<user::repo::LoadError> for DeleteError { } } } + +impl From<history::DeleteError> for DeleteError { + fn from(error: history::DeleteError) -> Self { + match error { + history::DeleteError::Deleted(message) => Self::Deleted(message.id().clone()), + } + } +} diff --git a/src/message/handlers/delete/mod.rs b/src/message/handlers/delete/mod.rs index 606f502..3e9a212 100644 --- a/src/message/handlers/delete/mod.rs +++ b/src/message/handlers/delete/mod.rs @@ -51,10 +51,9 @@ impl IntoResponse for Error { DeleteError::MessageNotFound(_) | DeleteError::Deleted(_) => { NotFound(error).into_response() } - DeleteError::UserNotFound(_) - | DeleteError::UserDeleted(_) - | DeleteError::Database(_) - | DeleteError::Name(_) => Internal::from(error).into_response(), + DeleteError::Database(_) | DeleteError::Name(_) => { + Internal::from(error).into_response() + } } } } diff --git a/src/message/handlers/delete/test.rs b/src/message/handlers/delete/test.rs index d0e1794..05d9344 100644 --- a/src/message/handlers/delete/test.rs +++ b/src/message/handlers/delete/test.rs @@ -70,23 +70,23 @@ pub async fn delete_deleted() { // Set up the environment let app = fixtures::scratch_app().await; - let sender = fixtures::user::create(&app, &fixtures::now()).await; + let sender = fixtures::identity::create(&app, &fixtures::now()).await; let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; - let message = fixtures::message::send(&app, &conversation, &sender, &fixtures::now()).await; + let message = + fixtures::message::send(&app, &conversation, &sender.login, &fixtures::now()).await; app.messages() - .delete(&sender, &message.id, &fixtures::now()) + .delete(&sender.login, &message.id, &fixtures::now()) .await .expect("deleting a recently-sent message succeeds"); // Send the request - let deleter = fixtures::identity::create(&app, &fixtures::now()).await; let super::Error(error) = super::handler( State(app.clone()), Path(message.id.clone()), fixtures::now(), - deleter, + sender, ) .await .expect_err("deleting a deleted message fails"); @@ -101,9 +101,10 @@ pub async fn delete_expired() { // Set up the environment let app = fixtures::scratch_app().await; - let sender = fixtures::user::create(&app, &fixtures::ancient()).await; + let sender = fixtures::identity::create(&app, &fixtures::ancient()).await; let conversation = fixtures::conversation::create(&app, &fixtures::ancient()).await; - let message = fixtures::message::send(&app, &conversation, &sender, &fixtures::ancient()).await; + let message = + fixtures::message::send(&app, &conversation, &sender.login, &fixtures::ancient()).await; app.messages() .expire(&fixtures::now()) @@ -112,12 +113,11 @@ pub async fn delete_expired() { // Send the request - let deleter = fixtures::identity::create(&app, &fixtures::now()).await; let super::Error(error) = super::handler( State(app.clone()), Path(message.id.clone()), fixtures::now(), - deleter, + sender, ) .await .expect_err("deleting an expired message fails"); diff --git a/src/message/history.rs b/src/message/history.rs index 2abdf2c..92cecc9 100644 --- a/src/message/history.rs +++ b/src/message/history.rs @@ -1,18 +1,67 @@ use itertools::Itertools as _; use super::{ - Message, + Body, Id, Message, event::{Deleted, Event, Sent}, }; -use crate::event::Sequence; +use crate::{ + conversation::Conversation, + event::{Instant, Sequence}, + user::{self, User}, +}; #[derive(Clone, Debug, Eq, PartialEq)] pub struct History { pub message: Message, } +// Lifecycle interface +impl History { + pub fn begin(conversation: &Conversation, sender: &User, body: &Body, sent: Instant) -> Self { + Self { + message: Message { + id: Id::generate(), + conversation: conversation.id.clone(), + sender: sender.id.clone(), + body: body.clone(), + sent, + deleted: None, + }, + } + } + + pub fn delete(self, deleted: Instant) -> Result<Self, DeleteError> { + if self.message.deleted.is_none() { + Ok(Self { + message: Message { + deleted: Some(deleted), + ..self.message + }, + }) + } else { + Err(DeleteError::Deleted(self.into())) + } + } +} + +#[derive(Debug, thiserror::Error)] +pub enum DeleteError { + #[error("message {} already deleted", .0.message.id)] + // Payload is boxed here to avoid copying an entire `History` around in any errors this error + // gets chained into. See <https://rust-lang.github.io/rust-clippy/master/index.html#result_large_err>. + Deleted(Box<History>), +} + // State interface impl History { + pub fn id(&self) -> &Id { + &self.message.id + } + + pub fn sender(&self) -> &user::Id { + &self.message.sender + } + // Snapshot of this message as it was when sent. (Note to the future: it's okay // if this returns a redacted or modified version of the message. If we // implement message editing by redacting the original body, then this should @@ -30,15 +79,16 @@ impl History { .filter(Sequence::up_to(sequence.into())) .collect() } - - // Snapshot of this message as of all events recorded in this history. - pub fn as_snapshot(&self) -> Option<Message> { - self.events().collect() - } } // Events interface impl History { + pub fn events(&self) -> impl Iterator<Item = Event> + Clone + use<> { + [self.sent()] + .into_iter() + .merge_by(self.deleted(), Sequence::merge) + } + fn sent(&self) -> Event { Sent { message: self.message.clone(), @@ -55,10 +105,4 @@ impl History { .into() }) } - - pub fn events(&self) -> impl Iterator<Item = Event> + use<> { - [self.sent()] - .into_iter() - .merge_by(self.deleted(), Sequence::merge) - } } diff --git a/src/message/repo.rs b/src/message/repo.rs index 83bf0d5..4f66bdc 100644 --- a/src/message/repo.rs +++ b/src/message/repo.rs @@ -1,11 +1,14 @@ use sqlx::{SqliteConnection, Transaction, sqlite::Sqlite}; -use super::{Body, History, Id, snapshot::Message}; +use super::{ + Body, Event, History, Id, Message, + event::{Deleted, Sent}, +}; use crate::{ clock::DateTime, - conversation::{self, Conversation}, + conversation, event::{Instant, Sequence}, - user::{self, User}, + user, }; pub trait Provider { @@ -21,50 +24,84 @@ impl Provider for Transaction<'_, Sqlite> { pub struct Messages<'t>(&'t mut SqliteConnection); impl Messages<'_> { - pub async fn create( + pub async fn record_events( &mut self, - conversation: &Conversation, - sender: &User, - sent: &Instant, - body: &Body, - ) -> Result<History, sqlx::Error> { - let id = Id::generate(); + events: impl IntoIterator<Item = Event>, + ) -> Result<(), sqlx::Error> { + for event in events { + self.record_event(&event).await?; + } + Ok(()) + } - let message = sqlx::query!( + pub async fn record_event(&mut self, event: &Event) -> Result<(), sqlx::Error> { + match event { + Event::Sent(sent) => self.record_sent(sent).await, + Event::Deleted(deleted) => self.record_deleted(deleted).await, + } + } + + async fn record_sent(&mut self, sent: &Sent) -> Result<(), sqlx::Error> { + let Message { + id, + conversation, + sender, + body, + sent, + deleted: _, + } = &sent.message; + + sqlx::query!( r#" insert into message - (id, conversation, sender, sent_at, sent_sequence, body, last_sequence) - values ($1, $2, $3, $4, $5, $6, $7) - returning - id as "id: Id", - conversation as "conversation: conversation::Id", - sender as "sender: user::Id", - sent_at as "sent_at: DateTime", - sent_sequence as "sent_sequence: Sequence", - body as "body: Body" + (id, conversation, sender, body, sent_at, sent_sequence, last_sequence) + values ($1, $2, $3, $4, $5, $6, $6) "#, id, - conversation.id, - sender.id, - sent.at, - sent.sequence, + conversation, + sender, body, + sent.at, sent.sequence, ) - .map(|row| History { - message: Message { - sent: Instant::new(row.sent_at, row.sent_sequence), - conversation: row.conversation, - sender: row.sender, - id: row.id, - body: row.body.unwrap_or_default(), - deleted: None, - }, - }) - .fetch_one(&mut *self.0) + .execute(&mut *self.0) .await?; - Ok(message) + Ok(()) + } + + async fn record_deleted(&mut self, deleted: &Deleted) -> Result<(), sqlx::Error> { + let Deleted { instant, id } = deleted; + + sqlx::query!( + r#" + insert into message_deleted (id, deleted_at, deleted_sequence) + values ($1, $2, $3) + "#, + id, + instant.at, + instant.sequence, + ) + .execute(&mut *self.0) + .await?; + + // Small social responsibility hack here: when a message is deleted, its body is + // retconned to have been the empty string. Someone reading the event stream + // afterwards, or looking at messages in the conversation, cannot retrieve the + // "deleted" message by ignoring the deletion event. + sqlx::query!( + r#" + update message + set body = '', last_sequence = max(last_sequence, $1) + where id = $2 + "#, + instant.sequence, + id, + ) + .execute(&mut *self.0) + .await?; + + Ok(()) } pub async fn live( @@ -178,45 +215,6 @@ impl Messages<'_> { Ok(message) } - pub async fn delete( - &mut self, - message: &Message, - deleted: &Instant, - ) -> Result<History, sqlx::Error> { - sqlx::query!( - r#" - insert into message_deleted (id, deleted_at, deleted_sequence) - values ($1, $2, $3) - "#, - message.id, - deleted.at, - deleted.sequence, - ) - .execute(&mut *self.0) - .await?; - - // Small social responsibility hack here: when a message is deleted, its body is - // retconned to have been the empty string. Someone reading the event stream - // afterwards, or looking at messages in the conversation, cannot retrieve the - // "deleted" message by ignoring the deletion event. - sqlx::query!( - r#" - update message - set body = '', last_sequence = max(last_sequence, $1) - where id = $2 - returning id as "id: Id" - "#, - deleted.sequence, - message.id, - ) - .fetch_one(&mut *self.0) - .await?; - - let message = self.by_id(&message.id).await?; - - Ok(message) - } - pub async fn purge(&mut self, purge_at: &DateTime) -> Result<(), sqlx::Error> { let messages = sqlx::query_scalar!( r#" |
