diff options
| author | Owen Jacobson <owen@grimoire.ca> | 2025-08-26 03:17:02 -0400 |
|---|---|---|
| committer | Owen Jacobson <owen@grimoire.ca> | 2025-08-26 18:05:00 -0400 |
| commit | 1e0493f079d011df56fe2ec93c44a0fea38f0531 (patch) | |
| tree | 0936a24c2fd2078249f21d06a80cbba984c79e74 | |
| parent | ca4ac1d0f12532c38d4041aba6ae50ae4093ae13 (diff) | |
Store `Message` instances using their events.
I found a test bug! The tests for deleting previously-deleted or previously-expired tests were using the wrong user to try to delete those messages. The tests happened to pass anyways because the message authorship check was done after the message lifecycle check. They would have no longer passed; the tests are fixed to use the sender, instead.
| -rw-r--r-- | .sqlx/query-427a530f68282ba586c1e2d980b7f8cbc8a8339377814ca5f889860da99b1561.json | 50 | ||||
| -rw-r--r-- | .sqlx/query-47bf3a66c20225f28c6c7f2d5ee0e8b184e5269874b17715e2fa2f1f520bd83f.json | 12 | ||||
| -rw-r--r-- | .sqlx/query-53c5c19f2e284b45b50fe3b5acc118678830ae380dbed94542e85e294b3d9ace.json | 12 | ||||
| -rw-r--r-- | .sqlx/query-64fb9bad4505c144578e393a7c0c7e8cf92e5ee6e3900fe9f94c75b5f8c9bfc4.json | 20 | ||||
| -rw-r--r-- | src/conversation/app.rs | 11 | ||||
| -rw-r--r-- | src/message/app.rs | 72 | ||||
| -rw-r--r-- | src/message/handlers/delete/mod.rs | 7 | ||||
| -rw-r--r-- | src/message/handlers/delete/test.rs | 18 | ||||
| -rw-r--r-- | src/message/history.rs | 70 | ||||
| -rw-r--r-- | src/message/repo.rs | 148 |
10 files changed, 208 insertions, 212 deletions
diff --git a/.sqlx/query-427a530f68282ba586c1e2d980b7f8cbc8a8339377814ca5f889860da99b1561.json b/.sqlx/query-427a530f68282ba586c1e2d980b7f8cbc8a8339377814ca5f889860da99b1561.json deleted file mode 100644 index 82db559..0000000 --- a/.sqlx/query-427a530f68282ba586c1e2d980b7f8cbc8a8339377814ca5f889860da99b1561.json +++ /dev/null @@ -1,50 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n insert into message\n (id, conversation, sender, sent_at, sent_sequence, body, last_sequence)\n values ($1, $2, $3, $4, $5, $6, $7)\n returning\n id as \"id: Id\",\n conversation as \"conversation: conversation::Id\",\n sender as \"sender: user::Id\",\n sent_at as \"sent_at: DateTime\",\n sent_sequence as \"sent_sequence: Sequence\",\n body as \"body: Body\"\n ", - "describe": { - "columns": [ - { - "name": "id: Id", - "ordinal": 0, - "type_info": "Text" - }, - { - "name": "conversation: conversation::Id", - "ordinal": 1, - "type_info": "Text" - }, - { - "name": "sender: user::Id", - "ordinal": 2, - "type_info": "Text" - }, - { - "name": "sent_at: DateTime", - "ordinal": 3, - "type_info": "Text" - }, - { - "name": "sent_sequence: Sequence", - "ordinal": 4, - "type_info": "Integer" - }, - { - "name": "body: Body", - "ordinal": 5, - "type_info": "Text" - } - ], - "parameters": { - "Right": 7 - }, - "nullable": [ - false, - false, - false, - false, - false, - true - ] - }, - "hash": "427a530f68282ba586c1e2d980b7f8cbc8a8339377814ca5f889860da99b1561" -} diff --git a/.sqlx/query-47bf3a66c20225f28c6c7f2d5ee0e8b184e5269874b17715e2fa2f1f520bd83f.json b/.sqlx/query-47bf3a66c20225f28c6c7f2d5ee0e8b184e5269874b17715e2fa2f1f520bd83f.json new file mode 100644 index 0000000..ce70fcd --- /dev/null +++ b/.sqlx/query-47bf3a66c20225f28c6c7f2d5ee0e8b184e5269874b17715e2fa2f1f520bd83f.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "\n update message\n set body = '', last_sequence = max(last_sequence, $1)\n where id = $2\n ", + "describe": { + "columns": [], + "parameters": { + "Right": 2 + }, + "nullable": [] + }, + "hash": "47bf3a66c20225f28c6c7f2d5ee0e8b184e5269874b17715e2fa2f1f520bd83f" +} diff --git a/.sqlx/query-53c5c19f2e284b45b50fe3b5acc118678830ae380dbed94542e85e294b3d9ace.json b/.sqlx/query-53c5c19f2e284b45b50fe3b5acc118678830ae380dbed94542e85e294b3d9ace.json new file mode 100644 index 0000000..85e43c8 --- /dev/null +++ b/.sqlx/query-53c5c19f2e284b45b50fe3b5acc118678830ae380dbed94542e85e294b3d9ace.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "\n insert into message\n (id, conversation, sender, body, sent_at, sent_sequence, last_sequence)\n values ($1, $2, $3, $4, $5, $6, $6)\n ", + "describe": { + "columns": [], + "parameters": { + "Right": 6 + }, + "nullable": [] + }, + "hash": "53c5c19f2e284b45b50fe3b5acc118678830ae380dbed94542e85e294b3d9ace" +} diff --git a/.sqlx/query-64fb9bad4505c144578e393a7c0c7e8cf92e5ee6e3900fe9f94c75b5f8c9bfc4.json b/.sqlx/query-64fb9bad4505c144578e393a7c0c7e8cf92e5ee6e3900fe9f94c75b5f8c9bfc4.json deleted file mode 100644 index 5179e74..0000000 --- a/.sqlx/query-64fb9bad4505c144578e393a7c0c7e8cf92e5ee6e3900fe9f94c75b5f8c9bfc4.json +++ /dev/null @@ -1,20 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n update message\n set body = '', last_sequence = max(last_sequence, $1)\n where id = $2\n returning id as \"id: Id\"\n ", - "describe": { - "columns": [ - { - "name": "id: Id", - "ordinal": 0, - "type_info": "Text" - } - ], - "parameters": { - "Right": 2 - }, - "nullable": [ - false - ] - }, - "hash": "64fb9bad4505c144578e393a7c0c7e8cf92e5ee6e3900fe9f94c75b5f8c9bfc4" -} diff --git a/src/conversation/app.rs b/src/conversation/app.rs index 30baf77..5e07292 100644 --- a/src/conversation/app.rs +++ b/src/conversation/app.rs @@ -86,18 +86,21 @@ impl<'a> Conversations<'a> { .not_found(|| DeleteError::NotFound(conversation.clone()))?; let messages = tx.messages().live(&conversation).await?; + let deleted_at = tx.sequence().next(deleted_at).await?; + let has_messages = messages .iter() - .map(message::History::as_snapshot) + .map(|message| message.as_of(deleted_at)) .any(|message| message.is_some()); if has_messages { return Err(DeleteError::NotEmpty(conversation.id().clone())); } - let deleted = tx.sequence().next(deleted_at).await?; - let conversation = conversation.delete(deleted)?; + let conversation = conversation.delete(deleted_at)?; - let events = conversation.events().filter(Sequence::start_from(deleted)); + let events = conversation + .events() + .filter(Sequence::start_from(deleted_at)); tx.conversations().record_events(events.clone()).await?; tx.commit().await?; diff --git a/src/message/app.rs b/src/message/app.rs index 9100224..f0a62d0 100644 --- a/src/message/app.rs +++ b/src/message/app.rs @@ -2,7 +2,7 @@ use chrono::TimeDelta; use itertools::Itertools; use sqlx::sqlite::SqlitePool; -use super::{Body, Id, Message, repo::Provider as _}; +use super::{Body, History, Id, Message, history, repo::Provider as _}; use crate::{ clock::DateTime, conversation::{self, repo::Provider as _}, @@ -52,14 +52,18 @@ impl<'a> Messages<'a> { let sent = tx.sequence().next(sent_at).await?; let conversation = conversation.as_of(sent).ok_or_else(conversation_deleted)?; let sender = sender.as_of(sent).ok_or_else(sender_deleted)?; - let message = tx - .messages() - .create(&conversation, &sender, &sent, body) - .await?; + let message = History::begin(&conversation, &sender, body, sent); + + // This filter technically includes every event in the history, but it's easier to follow if + // the various event-manipulating app methods are consistent, and it's harmless to have an + // always-satisfied filter. + let events = message.events().filter(Sequence::start_from(sent)); + tx.messages().record_events(events.clone()).await?; + tx.commit().await?; self.events - .broadcast(message.events().map(Event::from).collect::<Vec<_>>()); + .broadcast(events.map(Event::from).collect::<Vec<_>>()); Ok(message.as_sent()) } @@ -71,38 +75,25 @@ impl<'a> Messages<'a> { deleted_at: &DateTime, ) -> Result<(), DeleteError> { let message_not_found = || DeleteError::MessageNotFound(message.clone()); - let message_deleted = || DeleteError::Deleted(message.clone()); - let deleter_not_found = || DeleteError::UserNotFound(deleted_by.id.clone().into()); - let deleter_deleted = || DeleteError::UserDeleted(deleted_by.id.clone().into()); let not_sender = || DeleteError::NotSender(deleted_by.id.clone().into()); let mut tx = self.db.begin().await?; + let message = tx .messages() .by_id(message) .await .not_found(message_not_found)?; - let deleted_by = tx - .users() - .by_login(deleted_by) - .await - .not_found(deleter_not_found)?; - - let deleted = tx.sequence().next(deleted_at).await?; - let message = message.as_of(deleted).ok_or_else(message_deleted)?; - let deleted_by = deleted_by.as_of(deleted).ok_or_else(deleter_deleted)?; + if message.sender() == &deleted_by.id { + let deleted_at = tx.sequence().next(deleted_at).await?; + let message = message.delete(deleted_at)?; - if message.sender == deleted_by.id { - let message = tx.messages().delete(&message, &deleted).await?; + let events = message.events().filter(Sequence::start_from(deleted_at)); + tx.messages().record_events(events.clone()).await?; tx.commit().await?; - self.events.broadcast( - message - .events() - .filter(Sequence::start_from(deleted.sequence)) - .map(Event::from) - .collect::<Vec<_>>(), - ); + self.events + .broadcast(events.map(Event::from).collect::<Vec<_>>()); Ok(()) } else { @@ -120,13 +111,16 @@ impl<'a> Messages<'a> { let mut events = Vec::with_capacity(expired.len()); for message in expired { let deleted = tx.sequence().next(relative_to).await?; - if let Some(message) = message.as_of(deleted) { - let message = tx.messages().delete(&message, &deleted).await?; - events.push( - message + match message.delete(deleted) { + Ok(message) => { + let message_events = message .events() - .filter(Sequence::start_from(deleted.sequence)), - ); + .filter(Sequence::start_from(deleted.sequence)); + tx.messages().record_events(message_events.clone()).await?; + + events.push(message_events); + } + Err(history::DeleteError::Deleted(_)) => {} } } @@ -195,10 +189,6 @@ impl From<user::repo::LoadError> for SendError { pub enum DeleteError { #[error("message {0} not found")] MessageNotFound(Id), - #[error("user {0} not found")] - UserNotFound(user::Id), - #[error("user {0} deleted")] - UserDeleted(user::Id), #[error("user {0} not the message's sender")] NotSender(user::Id), #[error("message {0} deleted")] @@ -218,3 +208,11 @@ impl From<user::repo::LoadError> for DeleteError { } } } + +impl From<history::DeleteError> for DeleteError { + fn from(error: history::DeleteError) -> Self { + match error { + history::DeleteError::Deleted(message) => Self::Deleted(message.id().clone()), + } + } +} diff --git a/src/message/handlers/delete/mod.rs b/src/message/handlers/delete/mod.rs index 606f502..3e9a212 100644 --- a/src/message/handlers/delete/mod.rs +++ b/src/message/handlers/delete/mod.rs @@ -51,10 +51,9 @@ impl IntoResponse for Error { DeleteError::MessageNotFound(_) | DeleteError::Deleted(_) => { NotFound(error).into_response() } - DeleteError::UserNotFound(_) - | DeleteError::UserDeleted(_) - | DeleteError::Database(_) - | DeleteError::Name(_) => Internal::from(error).into_response(), + DeleteError::Database(_) | DeleteError::Name(_) => { + Internal::from(error).into_response() + } } } } diff --git a/src/message/handlers/delete/test.rs b/src/message/handlers/delete/test.rs index d0e1794..05d9344 100644 --- a/src/message/handlers/delete/test.rs +++ b/src/message/handlers/delete/test.rs @@ -70,23 +70,23 @@ pub async fn delete_deleted() { // Set up the environment let app = fixtures::scratch_app().await; - let sender = fixtures::user::create(&app, &fixtures::now()).await; + let sender = fixtures::identity::create(&app, &fixtures::now()).await; let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; - let message = fixtures::message::send(&app, &conversation, &sender, &fixtures::now()).await; + let message = + fixtures::message::send(&app, &conversation, &sender.login, &fixtures::now()).await; app.messages() - .delete(&sender, &message.id, &fixtures::now()) + .delete(&sender.login, &message.id, &fixtures::now()) .await .expect("deleting a recently-sent message succeeds"); // Send the request - let deleter = fixtures::identity::create(&app, &fixtures::now()).await; let super::Error(error) = super::handler( State(app.clone()), Path(message.id.clone()), fixtures::now(), - deleter, + sender, ) .await .expect_err("deleting a deleted message fails"); @@ -101,9 +101,10 @@ pub async fn delete_expired() { // Set up the environment let app = fixtures::scratch_app().await; - let sender = fixtures::user::create(&app, &fixtures::ancient()).await; + let sender = fixtures::identity::create(&app, &fixtures::ancient()).await; let conversation = fixtures::conversation::create(&app, &fixtures::ancient()).await; - let message = fixtures::message::send(&app, &conversation, &sender, &fixtures::ancient()).await; + let message = + fixtures::message::send(&app, &conversation, &sender.login, &fixtures::ancient()).await; app.messages() .expire(&fixtures::now()) @@ -112,12 +113,11 @@ pub async fn delete_expired() { // Send the request - let deleter = fixtures::identity::create(&app, &fixtures::now()).await; let super::Error(error) = super::handler( State(app.clone()), Path(message.id.clone()), fixtures::now(), - deleter, + sender, ) .await .expect_err("deleting an expired message fails"); diff --git a/src/message/history.rs b/src/message/history.rs index 2abdf2c..92cecc9 100644 --- a/src/message/history.rs +++ b/src/message/history.rs @@ -1,18 +1,67 @@ use itertools::Itertools as _; use super::{ - Message, + Body, Id, Message, event::{Deleted, Event, Sent}, }; -use crate::event::Sequence; +use crate::{ + conversation::Conversation, + event::{Instant, Sequence}, + user::{self, User}, +}; #[derive(Clone, Debug, Eq, PartialEq)] pub struct History { pub message: Message, } +// Lifecycle interface +impl History { + pub fn begin(conversation: &Conversation, sender: &User, body: &Body, sent: Instant) -> Self { + Self { + message: Message { + id: Id::generate(), + conversation: conversation.id.clone(), + sender: sender.id.clone(), + body: body.clone(), + sent, + deleted: None, + }, + } + } + + pub fn delete(self, deleted: Instant) -> Result<Self, DeleteError> { + if self.message.deleted.is_none() { + Ok(Self { + message: Message { + deleted: Some(deleted), + ..self.message + }, + }) + } else { + Err(DeleteError::Deleted(self.into())) + } + } +} + +#[derive(Debug, thiserror::Error)] +pub enum DeleteError { + #[error("message {} already deleted", .0.message.id)] + // Payload is boxed here to avoid copying an entire `History` around in any errors this error + // gets chained into. See <https://rust-lang.github.io/rust-clippy/master/index.html#result_large_err>. + Deleted(Box<History>), +} + // State interface impl History { + pub fn id(&self) -> &Id { + &self.message.id + } + + pub fn sender(&self) -> &user::Id { + &self.message.sender + } + // Snapshot of this message as it was when sent. (Note to the future: it's okay // if this returns a redacted or modified version of the message. If we // implement message editing by redacting the original body, then this should @@ -30,15 +79,16 @@ impl History { .filter(Sequence::up_to(sequence.into())) .collect() } - - // Snapshot of this message as of all events recorded in this history. - pub fn as_snapshot(&self) -> Option<Message> { - self.events().collect() - } } // Events interface impl History { + pub fn events(&self) -> impl Iterator<Item = Event> + Clone + use<> { + [self.sent()] + .into_iter() + .merge_by(self.deleted(), Sequence::merge) + } + fn sent(&self) -> Event { Sent { message: self.message.clone(), @@ -55,10 +105,4 @@ impl History { .into() }) } - - pub fn events(&self) -> impl Iterator<Item = Event> + use<> { - [self.sent()] - .into_iter() - .merge_by(self.deleted(), Sequence::merge) - } } diff --git a/src/message/repo.rs b/src/message/repo.rs index 83bf0d5..4f66bdc 100644 --- a/src/message/repo.rs +++ b/src/message/repo.rs @@ -1,11 +1,14 @@ use sqlx::{SqliteConnection, Transaction, sqlite::Sqlite}; -use super::{Body, History, Id, snapshot::Message}; +use super::{ + Body, Event, History, Id, Message, + event::{Deleted, Sent}, +}; use crate::{ clock::DateTime, - conversation::{self, Conversation}, + conversation, event::{Instant, Sequence}, - user::{self, User}, + user, }; pub trait Provider { @@ -21,50 +24,84 @@ impl Provider for Transaction<'_, Sqlite> { pub struct Messages<'t>(&'t mut SqliteConnection); impl Messages<'_> { - pub async fn create( + pub async fn record_events( &mut self, - conversation: &Conversation, - sender: &User, - sent: &Instant, - body: &Body, - ) -> Result<History, sqlx::Error> { - let id = Id::generate(); + events: impl IntoIterator<Item = Event>, + ) -> Result<(), sqlx::Error> { + for event in events { + self.record_event(&event).await?; + } + Ok(()) + } - let message = sqlx::query!( + pub async fn record_event(&mut self, event: &Event) -> Result<(), sqlx::Error> { + match event { + Event::Sent(sent) => self.record_sent(sent).await, + Event::Deleted(deleted) => self.record_deleted(deleted).await, + } + } + + async fn record_sent(&mut self, sent: &Sent) -> Result<(), sqlx::Error> { + let Message { + id, + conversation, + sender, + body, + sent, + deleted: _, + } = &sent.message; + + sqlx::query!( r#" insert into message - (id, conversation, sender, sent_at, sent_sequence, body, last_sequence) - values ($1, $2, $3, $4, $5, $6, $7) - returning - id as "id: Id", - conversation as "conversation: conversation::Id", - sender as "sender: user::Id", - sent_at as "sent_at: DateTime", - sent_sequence as "sent_sequence: Sequence", - body as "body: Body" + (id, conversation, sender, body, sent_at, sent_sequence, last_sequence) + values ($1, $2, $3, $4, $5, $6, $6) "#, id, - conversation.id, - sender.id, - sent.at, - sent.sequence, + conversation, + sender, body, + sent.at, sent.sequence, ) - .map(|row| History { - message: Message { - sent: Instant::new(row.sent_at, row.sent_sequence), - conversation: row.conversation, - sender: row.sender, - id: row.id, - body: row.body.unwrap_or_default(), - deleted: None, - }, - }) - .fetch_one(&mut *self.0) + .execute(&mut *self.0) .await?; - Ok(message) + Ok(()) + } + + async fn record_deleted(&mut self, deleted: &Deleted) -> Result<(), sqlx::Error> { + let Deleted { instant, id } = deleted; + + sqlx::query!( + r#" + insert into message_deleted (id, deleted_at, deleted_sequence) + values ($1, $2, $3) + "#, + id, + instant.at, + instant.sequence, + ) + .execute(&mut *self.0) + .await?; + + // Small social responsibility hack here: when a message is deleted, its body is + // retconned to have been the empty string. Someone reading the event stream + // afterwards, or looking at messages in the conversation, cannot retrieve the + // "deleted" message by ignoring the deletion event. + sqlx::query!( + r#" + update message + set body = '', last_sequence = max(last_sequence, $1) + where id = $2 + "#, + instant.sequence, + id, + ) + .execute(&mut *self.0) + .await?; + + Ok(()) } pub async fn live( @@ -178,45 +215,6 @@ impl Messages<'_> { Ok(message) } - pub async fn delete( - &mut self, - message: &Message, - deleted: &Instant, - ) -> Result<History, sqlx::Error> { - sqlx::query!( - r#" - insert into message_deleted (id, deleted_at, deleted_sequence) - values ($1, $2, $3) - "#, - message.id, - deleted.at, - deleted.sequence, - ) - .execute(&mut *self.0) - .await?; - - // Small social responsibility hack here: when a message is deleted, its body is - // retconned to have been the empty string. Someone reading the event stream - // afterwards, or looking at messages in the conversation, cannot retrieve the - // "deleted" message by ignoring the deletion event. - sqlx::query!( - r#" - update message - set body = '', last_sequence = max(last_sequence, $1) - where id = $2 - returning id as "id: Id" - "#, - deleted.sequence, - message.id, - ) - .fetch_one(&mut *self.0) - .await?; - - let message = self.by_id(&message.id).await?; - - Ok(message) - } - pub async fn purge(&mut self, purge_at: &DateTime) -> Result<(), sqlx::Error> { let messages = sqlx::query_scalar!( r#" |
