From 1e0493f079d011df56fe2ec93c44a0fea38f0531 Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Tue, 26 Aug 2025 03:17:02 -0400 Subject: Store `Message` instances using their events. I found a test bug! The tests for deleting previously-deleted or previously-expired tests were using the wrong user to try to delete those messages. The tests happened to pass anyways because the message authorship check was done after the message lifecycle check. They would have no longer passed; the tests are fixed to use the sender, instead. --- src/message/app.rs | 72 ++++++++++++++++++++++++++---------------------------- 1 file changed, 35 insertions(+), 37 deletions(-) (limited to 'src/message/app.rs') diff --git a/src/message/app.rs b/src/message/app.rs index 9100224..f0a62d0 100644 --- a/src/message/app.rs +++ b/src/message/app.rs @@ -2,7 +2,7 @@ use chrono::TimeDelta; use itertools::Itertools; use sqlx::sqlite::SqlitePool; -use super::{Body, Id, Message, repo::Provider as _}; +use super::{Body, History, Id, Message, history, repo::Provider as _}; use crate::{ clock::DateTime, conversation::{self, repo::Provider as _}, @@ -52,14 +52,18 @@ impl<'a> Messages<'a> { let sent = tx.sequence().next(sent_at).await?; let conversation = conversation.as_of(sent).ok_or_else(conversation_deleted)?; let sender = sender.as_of(sent).ok_or_else(sender_deleted)?; - let message = tx - .messages() - .create(&conversation, &sender, &sent, body) - .await?; + let message = History::begin(&conversation, &sender, body, sent); + + // This filter technically includes every event in the history, but it's easier to follow if + // the various event-manipulating app methods are consistent, and it's harmless to have an + // always-satisfied filter. + let events = message.events().filter(Sequence::start_from(sent)); + tx.messages().record_events(events.clone()).await?; + tx.commit().await?; self.events - .broadcast(message.events().map(Event::from).collect::>()); + .broadcast(events.map(Event::from).collect::>()); Ok(message.as_sent()) } @@ -71,38 +75,25 @@ impl<'a> Messages<'a> { deleted_at: &DateTime, ) -> Result<(), DeleteError> { let message_not_found = || DeleteError::MessageNotFound(message.clone()); - let message_deleted = || DeleteError::Deleted(message.clone()); - let deleter_not_found = || DeleteError::UserNotFound(deleted_by.id.clone().into()); - let deleter_deleted = || DeleteError::UserDeleted(deleted_by.id.clone().into()); let not_sender = || DeleteError::NotSender(deleted_by.id.clone().into()); let mut tx = self.db.begin().await?; + let message = tx .messages() .by_id(message) .await .not_found(message_not_found)?; - let deleted_by = tx - .users() - .by_login(deleted_by) - .await - .not_found(deleter_not_found)?; - - let deleted = tx.sequence().next(deleted_at).await?; - let message = message.as_of(deleted).ok_or_else(message_deleted)?; - let deleted_by = deleted_by.as_of(deleted).ok_or_else(deleter_deleted)?; + if message.sender() == &deleted_by.id { + let deleted_at = tx.sequence().next(deleted_at).await?; + let message = message.delete(deleted_at)?; - if message.sender == deleted_by.id { - let message = tx.messages().delete(&message, &deleted).await?; + let events = message.events().filter(Sequence::start_from(deleted_at)); + tx.messages().record_events(events.clone()).await?; tx.commit().await?; - self.events.broadcast( - message - .events() - .filter(Sequence::start_from(deleted.sequence)) - .map(Event::from) - .collect::>(), - ); + self.events + .broadcast(events.map(Event::from).collect::>()); Ok(()) } else { @@ -120,13 +111,16 @@ impl<'a> Messages<'a> { let mut events = Vec::with_capacity(expired.len()); for message in expired { let deleted = tx.sequence().next(relative_to).await?; - if let Some(message) = message.as_of(deleted) { - let message = tx.messages().delete(&message, &deleted).await?; - events.push( - message + match message.delete(deleted) { + Ok(message) => { + let message_events = message .events() - .filter(Sequence::start_from(deleted.sequence)), - ); + .filter(Sequence::start_from(deleted.sequence)); + tx.messages().record_events(message_events.clone()).await?; + + events.push(message_events); + } + Err(history::DeleteError::Deleted(_)) => {} } } @@ -195,10 +189,6 @@ impl From for SendError { pub enum DeleteError { #[error("message {0} not found")] MessageNotFound(Id), - #[error("user {0} not found")] - UserNotFound(user::Id), - #[error("user {0} deleted")] - UserDeleted(user::Id), #[error("user {0} not the message's sender")] NotSender(user::Id), #[error("message {0} deleted")] @@ -218,3 +208,11 @@ impl From for DeleteError { } } } + +impl From for DeleteError { + fn from(error: history::DeleteError) -> Self { + match error { + history::DeleteError::Deleted(message) => Self::Deleted(message.id().clone()), + } + } +} -- cgit v1.2.3 From 17c38585fc2623a6b0196146cf6b6df9955ce979 Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Tue, 26 Aug 2025 18:35:54 -0400 Subject: Consolidate `events.map(…).collect()` calls into `Broadcaster`. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This conversion, from an iterator of type-specific events (say, `user::Event` or `message::Event`), into a `Vec`, is prevasive, and it needs to be done each time. Having Broadcaster expose a support method for this cuts down on the repetition, at the cost of a slightly alarming amount of type-system nonsense in `broadcast_from`. Historical footnote: the internal message structure is a Vec and not an individual message so that bulk operations, like expiring channels and messages, won't disconnect everyone if they happen to dispatch more than sixteen messages (current queue depth limit) at once. We trade allocation and memory pressure for keeping the connections alive. _Most_ event publishing is an iterator of one item, so the Vec allocation is redundant. --- src/broadcast.rs | 22 ++++++++++++++++++++-- src/conversation/app.rs | 17 +++++------------ src/login/app.rs | 7 ++++--- src/message/app.rs | 17 +++++------------ src/token/app.rs | 7 ++++--- src/user/create.rs | 2 +- 6 files changed, 39 insertions(+), 33 deletions(-) (limited to 'src/message/app.rs') diff --git a/src/broadcast.rs b/src/broadcast.rs index 6e1f04d..ee42c08 100644 --- a/src/broadcast.rs +++ b/src/broadcast.rs @@ -30,7 +30,7 @@ impl Broadcaster where M: Clone + Send + std::fmt::Debug + 'static, { - pub fn broadcast(&self, message: impl Into) { + pub fn broadcast(&self, message: M) { let tx = self.sender(); // Per the Tokio docs, the returned error is only used to indicate that @@ -40,7 +40,25 @@ where // // The successful return value, which includes the number of active // receivers, also isn't that interesting to us. - let _ = tx.send(message.into()); + let _ = tx.send(message); + } + + // If `M` is a type that can be obtained from an iterator, such as a `Vec`, and if `I` is an + // iterable of items that can be collected into `M`, then this will construct an `M` from the + // passed event iterator, converting each element as it goes. This emits one message (as `M`), + // containing whatever we collect out of `messages`. + // + // This is mostly meant for handling synchronized entity events, which tend to be generated as + // iterables of domain-specific event types, like `user::Event`, but broadcast as `Vec` + // for consumption by outside clients. + pub fn broadcast_from(&self, messages: I) + where + I: IntoIterator, + M: FromIterator, + E: From, + { + let message = messages.into_iter().map(Into::into).collect(); + self.broadcast(message); } pub fn subscribe(&self) -> impl Stream + std::fmt::Debug + use { diff --git a/src/conversation/app.rs b/src/conversation/app.rs index 5e07292..26886af 100644 --- a/src/conversation/app.rs +++ b/src/conversation/app.rs @@ -10,7 +10,7 @@ use super::{ use crate::{ clock::DateTime, db::{Duplicate as _, NotFound as _}, - event::{Broadcaster, Event, Sequence, repo::Provider as _}, + event::{Broadcaster, Sequence, repo::Provider as _}, message::repo::Provider as _, name::{self, Name}, }; @@ -49,8 +49,7 @@ impl<'a> Conversations<'a> { tx.commit().await?; - self.events - .broadcast(events.map(Event::from).collect::>()); + self.events.broadcast_from(events); Ok(conversation.as_created()) } @@ -105,8 +104,7 @@ impl<'a> Conversations<'a> { tx.commit().await?; - self.events - .broadcast(events.map(Event::from).collect::>()); + self.events.broadcast_from(events); Ok(()) } @@ -134,13 +132,8 @@ impl<'a> Conversations<'a> { tx.commit().await?; - self.events.broadcast( - events - .into_iter() - .kmerge_by(Sequence::merge) - .map(Event::from) - .collect::>(), - ); + self.events + .broadcast_from(events.into_iter().kmerge_by(Sequence::merge)); Ok(()) } diff --git a/src/login/app.rs b/src/login/app.rs index 77d4ac3..e471000 100644 --- a/src/login/app.rs +++ b/src/login/app.rs @@ -80,9 +80,10 @@ impl<'a> Logins<'a> { tx.tokens().create(&token, &secret).await?; tx.commit().await?; - for event in revoked.into_iter().map(TokenEvent::Revoked) { - self.token_events.broadcast(event); - } + revoked + .into_iter() + .map(TokenEvent::Revoked) + .for_each(|event| self.token_events.broadcast(event)); Ok(secret) } else { diff --git a/src/message/app.rs b/src/message/app.rs index f0a62d0..647152e 100644 --- a/src/message/app.rs +++ b/src/message/app.rs @@ -7,7 +7,7 @@ use crate::{ clock::DateTime, conversation::{self, repo::Provider as _}, db::NotFound as _, - event::{Broadcaster, Event, Sequence, repo::Provider as _}, + event::{Broadcaster, Sequence, repo::Provider as _}, login::Login, name, user::{self, repo::Provider as _}, @@ -62,8 +62,7 @@ impl<'a> Messages<'a> { tx.commit().await?; - self.events - .broadcast(events.map(Event::from).collect::>()); + self.events.broadcast_from(events); Ok(message.as_sent()) } @@ -92,8 +91,7 @@ impl<'a> Messages<'a> { tx.messages().record_events(events.clone()).await?; tx.commit().await?; - self.events - .broadcast(events.map(Event::from).collect::>()); + self.events.broadcast_from(events); Ok(()) } else { @@ -126,13 +124,8 @@ impl<'a> Messages<'a> { tx.commit().await?; - self.events.broadcast( - events - .into_iter() - .kmerge_by(Sequence::merge) - .map(Event::from) - .collect::>(), - ); + self.events + .broadcast_from(events.into_iter().kmerge_by(Sequence::merge)); Ok(()) } diff --git a/src/token/app.rs b/src/token/app.rs index fb5d712..1d68f32 100644 --- a/src/token/app.rs +++ b/src/token/app.rs @@ -102,9 +102,10 @@ impl<'a> Tokens<'a> { let tokens = tx.tokens().expire(&expire_at).await?; tx.commit().await?; - for event in tokens.into_iter().map(TokenEvent::Revoked) { - self.token_events.broadcast(event); - } + tokens + .into_iter() + .map(TokenEvent::Revoked) + .for_each(|event| self.token_events.broadcast(event)); Ok(()) } diff --git a/src/user/create.rs b/src/user/create.rs index 21c61d1..d6656e5 100644 --- a/src/user/create.rs +++ b/src/user/create.rs @@ -99,7 +99,7 @@ where login: _, } = self; - events.broadcast(user_events.into_iter().collect::>()); + events.broadcast_from(user_events); } pub fn login(&self) -> &Login { -- cgit v1.2.3