From a54c548bf00f881f36d2adc3a6a2614b5f72f9ce Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Tue, 26 Aug 2025 02:23:55 -0400 Subject: Allow callers to pass `Instant`s to `Sequence` predicate constructors. --- src/conversation/history.rs | 4 +--- src/event/sequence.rs | 12 +++++++++--- 2 files changed, 10 insertions(+), 6 deletions(-) (limited to 'src') diff --git a/src/conversation/history.rs b/src/conversation/history.rs index 746a1b0..8821277 100644 --- a/src/conversation/history.rs +++ b/src/conversation/history.rs @@ -30,9 +30,7 @@ impl History { where S: Into, { - self.events() - .filter(Sequence::up_to(sequence.into())) - .collect() + self.events().filter(Sequence::up_to(sequence)).collect() } // Snapshot of this conversation as of all events recorded in this history. diff --git a/src/event/sequence.rs b/src/event/sequence.rs index 77281c2..9a0ea5d 100644 --- a/src/event/sequence.rs +++ b/src/event/sequence.rs @@ -50,24 +50,30 @@ impl fmt::Display for Sequence { } impl Sequence { - pub fn up_to(resume_point: Sequence) -> impl for<'e> Fn(&'e E) -> bool + pub fn up_to(resume_point: P) -> impl for<'e> Fn(&'e E) -> bool + Clone where + P: Into, E: Sequenced, { + let resume_point = resume_point.into(); move |event| event.sequence() <= resume_point } - pub fn after(resume_point: Sequence) -> impl for<'e> Fn(&'e E) -> bool + pub fn after(resume_point: P) -> impl for<'e> Fn(&'e E) -> bool + Clone where + P: Into, E: Sequenced, { + let resume_point = resume_point.into(); move |event| resume_point < event.sequence() } - pub fn start_from(resume_point: Self) -> impl for<'e> Fn(&'e E) -> bool + pub fn start_from(resume_point: P) -> impl for<'e> Fn(&'e E) -> bool + Clone where + P: Into, E: Sequenced, { + let resume_point = resume_point.into(); move |event| resume_point <= event.sequence() } -- cgit v1.2.3 From ca4ac1d0f12532c38d4041aba6ae50ae4093ae13 Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Tue, 26 Aug 2025 02:25:57 -0400 Subject: Store `Conversation` instances using their events. This replaces the approach of having the repo type know about conversation lifecycle in detail. Instead, the repo type accepts events and applies them to the DB blindly. The SQL written to implement each event does, however, embed assumptions about what order events will happen in. --- ...cf58b1689d916fd1c209645c74e17a8da2102eada3.json | 12 ++ ...a9bde38547e626387dfe5b859f02ae1dbe171d5741.json | 12 -- ...3379a428eb461067b1769b8de60a73e2117a3cd11f.json | 12 ++ ...e22353037f33fd3a3eaec0e36f06ffbbffb625aa24.json | 20 --- src/conversation/app.rs | 69 +++++++---- src/conversation/history.rs | 40 +++++- src/conversation/repo.rs | 138 +++++++++++---------- 7 files changed, 178 insertions(+), 125 deletions(-) create mode 100644 .sqlx/query-17c3748391b152beb85ce3cf58b1689d916fd1c209645c74e17a8da2102eada3.json delete mode 100644 .sqlx/query-1f0f35655dd57532897aaba9bde38547e626387dfe5b859f02ae1dbe171d5741.json create mode 100644 .sqlx/query-32f50277d687903773db053379a428eb461067b1769b8de60a73e2117a3cd11f.json delete mode 100644 .sqlx/query-85145c8b8264e7d01eef66e22353037f33fd3a3eaec0e36f06ffbbffb625aa24.json (limited to 'src') diff --git a/.sqlx/query-17c3748391b152beb85ce3cf58b1689d916fd1c209645c74e17a8da2102eada3.json b/.sqlx/query-17c3748391b152beb85ce3cf58b1689d916fd1c209645c74e17a8da2102eada3.json new file mode 100644 index 0000000..46bfea0 --- /dev/null +++ b/.sqlx/query-17c3748391b152beb85ce3cf58b1689d916fd1c209645c74e17a8da2102eada3.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "\n update conversation\n set last_sequence = max(last_sequence, $1)\n where id = $2\n ", + "describe": { + "columns": [], + "parameters": { + "Right": 2 + }, + "nullable": [] + }, + "hash": "17c3748391b152beb85ce3cf58b1689d916fd1c209645c74e17a8da2102eada3" +} diff --git a/.sqlx/query-1f0f35655dd57532897aaba9bde38547e626387dfe5b859f02ae1dbe171d5741.json b/.sqlx/query-1f0f35655dd57532897aaba9bde38547e626387dfe5b859f02ae1dbe171d5741.json deleted file mode 100644 index 9c62ca9..0000000 --- a/.sqlx/query-1f0f35655dd57532897aaba9bde38547e626387dfe5b859f02ae1dbe171d5741.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n insert into conversation (id, created_at, created_sequence, last_sequence)\n values ($1, $2, $3, $4)\n ", - "describe": { - "columns": [], - "parameters": { - "Right": 4 - }, - "nullable": [] - }, - "hash": "1f0f35655dd57532897aaba9bde38547e626387dfe5b859f02ae1dbe171d5741" -} diff --git a/.sqlx/query-32f50277d687903773db053379a428eb461067b1769b8de60a73e2117a3cd11f.json b/.sqlx/query-32f50277d687903773db053379a428eb461067b1769b8de60a73e2117a3cd11f.json new file mode 100644 index 0000000..3d33188 --- /dev/null +++ b/.sqlx/query-32f50277d687903773db053379a428eb461067b1769b8de60a73e2117a3cd11f.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "\n insert into conversation (id, created_at, created_sequence, last_sequence)\n values ($1, $2, $3, $3)\n ", + "describe": { + "columns": [], + "parameters": { + "Right": 3 + }, + "nullable": [] + }, + "hash": "32f50277d687903773db053379a428eb461067b1769b8de60a73e2117a3cd11f" +} diff --git a/.sqlx/query-85145c8b8264e7d01eef66e22353037f33fd3a3eaec0e36f06ffbbffb625aa24.json b/.sqlx/query-85145c8b8264e7d01eef66e22353037f33fd3a3eaec0e36f06ffbbffb625aa24.json deleted file mode 100644 index 9d212fa..0000000 --- a/.sqlx/query-85145c8b8264e7d01eef66e22353037f33fd3a3eaec0e36f06ffbbffb625aa24.json +++ /dev/null @@ -1,20 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n update conversation\n set last_sequence = max(last_sequence, $1)\n where id = $2\n returning id as \"id: Id\"\n ", - "describe": { - "columns": [ - { - "name": "id: Id", - "ordinal": 0, - "type_info": "Text" - } - ], - "parameters": { - "Right": 2 - }, - "nullable": [ - false - ] - }, - "hash": "85145c8b8264e7d01eef66e22353037f33fd3a3eaec0e36f06ffbbffb625aa24" -} diff --git a/src/conversation/app.rs b/src/conversation/app.rs index 81ccdcf..30baf77 100644 --- a/src/conversation/app.rs +++ b/src/conversation/app.rs @@ -3,7 +3,7 @@ use itertools::Itertools; use sqlx::sqlite::SqlitePool; use super::{ - Conversation, Id, + Conversation, History, Id, history, repo::{LoadError, Provider as _}, validate, }; @@ -11,7 +11,7 @@ use crate::{ clock::DateTime, db::{Duplicate as _, NotFound as _}, event::{Broadcaster, Event, Sequence, repo::Provider as _}, - message::{self, repo::Provider as _}, + message::repo::Provider as _, name::{self, Name}, }; @@ -36,15 +36,21 @@ impl<'a> Conversations<'a> { let mut tx = self.db.begin().await?; let created = tx.sequence().next(created_at).await?; - let conversation = tx - .conversations() - .create(name, &created) + let conversation = History::begin(name, created); + + // This filter technically includes every event in the history, but it's easier to follow if + // the various event-manipulating app methods are consistent, and it's harmless to have an + // always-satisfied filter. + let events = conversation.events().filter(Sequence::start_from(created)); + tx.conversations() + .record_events(events.clone()) .await .duplicate(|| CreateError::DuplicateName(name.clone()))?; + tx.commit().await?; self.events - .broadcast(conversation.events().map(Event::from).collect::>()); + .broadcast(events.map(Event::from).collect::>()); Ok(conversation.as_created()) } @@ -78,11 +84,6 @@ impl<'a> Conversations<'a> { .by_id(conversation) .await .not_found(|| DeleteError::NotFound(conversation.clone()))?; - conversation - .as_snapshot() - .ok_or_else(|| DeleteError::Deleted(conversation.id().clone()))?; - - let mut events = Vec::new(); let messages = tx.messages().live(&conversation).await?; let has_messages = messages @@ -94,17 +95,15 @@ impl<'a> Conversations<'a> { } let deleted = tx.sequence().next(deleted_at).await?; - let conversation = tx.conversations().delete(&conversation, &deleted).await?; - events.extend( - conversation - .events() - .filter(Sequence::start_from(deleted.sequence)) - .map(Event::from), - ); + let conversation = conversation.delete(deleted)?; + + let events = conversation.events().filter(Sequence::start_from(deleted)); + tx.conversations().record_events(events.clone()).await?; tx.commit().await?; - self.events.broadcast(events); + self.events + .broadcast(events.map(Event::from).collect::>()); Ok(()) } @@ -120,12 +119,14 @@ impl<'a> Conversations<'a> { let mut events = Vec::with_capacity(expired.len()); for conversation in expired { let deleted = tx.sequence().next(relative_to).await?; - let conversation = tx.conversations().delete(&conversation, &deleted).await?; - events.push( - conversation - .events() - .filter(Sequence::start_from(deleted.sequence)), - ); + let conversation = conversation.delete(deleted)?; + + let conversation_events = conversation.events().filter(Sequence::start_from(deleted)); + tx.conversations() + .record_events(conversation_events.clone()) + .await?; + + events.push(conversation_events); } tx.commit().await?; @@ -218,8 +219,18 @@ impl From for DeleteError { } } +impl From for DeleteError { + fn from(error: history::DeleteError) -> Self { + match error { + history::DeleteError::Deleted(conversation) => Self::Deleted(conversation.id().clone()), + } + } +} + #[derive(Debug, thiserror::Error)] pub enum ExpireError { + #[error("tried to expire already-deleted conversation: {0}")] + Deleted(Id), #[error(transparent)] Database(#[from] sqlx::Error), #[error(transparent)] @@ -234,3 +245,11 @@ impl From for ExpireError { } } } + +impl From for ExpireError { + fn from(error: history::DeleteError) -> Self { + match error { + history::DeleteError::Deleted(conversation) => Self::Deleted(conversation.id().clone()), + } + } +} diff --git a/src/conversation/history.rs b/src/conversation/history.rs index 8821277..5cba9ca 100644 --- a/src/conversation/history.rs +++ b/src/conversation/history.rs @@ -4,13 +4,49 @@ use super::{ Conversation, Id, event::{Created, Deleted, Event}, }; -use crate::event::Sequence; +use crate::{ + event::{Instant, Sequence}, + name::Name, +}; #[derive(Clone, Debug, Eq, PartialEq)] pub struct History { pub conversation: Conversation, } +// Lifecycle interface +impl History { + pub fn begin(name: &Name, created: Instant) -> Self { + Self { + conversation: Conversation { + id: Id::generate(), + name: name.clone(), + created, + deleted: None, + }, + } + } + + pub fn delete(self, deleted: Instant) -> Result { + if self.conversation.deleted.is_none() { + Ok(Self { + conversation: Conversation { + deleted: Some(deleted), + ..self.conversation + }, + }) + } else { + Err(DeleteError::Deleted(self)) + } + } +} + +#[derive(Debug, thiserror::Error)] +pub enum DeleteError { + #[error("conversation {} already deleted", .0.conversation.id)] + Deleted(History), +} + // State interface impl History { pub fn id(&self) -> &Id { @@ -41,7 +77,7 @@ impl History { // Event factories impl History { - pub fn events(&self) -> impl Iterator + use<> { + pub fn events(&self) -> impl Iterator + Clone + use<> { [self.created()] .into_iter() .merge_by(self.deleted(), Sequence::merge) diff --git a/src/conversation/repo.rs b/src/conversation/repo.rs index 7e38b62..cb66bf8 100644 --- a/src/conversation/repo.rs +++ b/src/conversation/repo.rs @@ -1,9 +1,12 @@ use futures::stream::{StreamExt as _, TryStreamExt as _}; use sqlx::{SqliteConnection, Transaction, sqlite::Sqlite}; +use super::{ + Conversation, Event, History, Id, + event::{Created, Deleted}, +}; use crate::{ clock::DateTime, - conversation::{Conversation, History, Id}, db::NotFound, event::{Instant, Sequence}, name::{self, Name}, @@ -22,22 +25,41 @@ impl Provider for Transaction<'_, Sqlite> { pub struct Conversations<'t>(&'t mut SqliteConnection); impl Conversations<'_> { - pub async fn create(&mut self, name: &Name, created: &Instant) -> Result { - let id = Id::generate(); - let name = name.clone(); + pub async fn record_events( + &mut self, + events: impl IntoIterator, + ) -> Result<(), sqlx::Error> { + for event in events { + self.record_event(&event).await?; + } + Ok(()) + } + + pub async fn record_event(&mut self, event: &Event) -> Result<(), sqlx::Error> { + match event { + Event::Created(created) => self.record_created(created).await, + Event::Deleted(deleted) => self.record_deleted(deleted).await, + } + } + + async fn record_created(&mut self, created: &Created) -> Result<(), sqlx::Error> { + let Conversation { + id, + created, + name, + deleted: _, + } = &created.conversation; let display_name = name.display(); let canonical_name = name.canonical(); - let created = *created; sqlx::query!( r#" insert into conversation (id, created_at, created_sequence, last_sequence) - values ($1, $2, $3, $4) + values ($1, $2, $3, $3) "#, id, created.at, created.sequence, - created.sequence, ) .execute(&mut *self.0) .await?; @@ -54,16 +76,50 @@ impl Conversations<'_> { .execute(&mut *self.0) .await?; - let conversation = History { - conversation: Conversation { - created, - id, - name: name.clone(), - deleted: None, - }, - }; + Ok(()) + } - Ok(conversation) + async fn record_deleted(&mut self, deleted: &Deleted) -> Result<(), sqlx::Error> { + let Deleted { instant, id } = deleted; + sqlx::query!( + r#" + update conversation + set last_sequence = max(last_sequence, $1) + where id = $2 + "#, + instant.sequence, + id, + ) + .execute(&mut *self.0) + .await?; + + sqlx::query!( + r#" + insert into conversation_deleted (id, deleted_at, deleted_sequence) + values ($1, $2, $3) + "#, + id, + instant.at, + instant.sequence, + ) + .execute(&mut *self.0) + .await?; + + // Small social responsibility hack here: when a conversation is deleted, its + // name is retconned to have been the empty string. Someone reading the event + // stream afterwards, or looking at conversations via the API, cannot retrieve + // the "deleted" conversation's information by ignoring the deletion event. + sqlx::query!( + r#" + delete from conversation_name + where id = $1 + "#, + id, + ) + .execute(&mut *self.0) + .await?; + + Ok(()) } pub async fn by_id(&mut self, conversation: &Id) -> Result { @@ -179,56 +235,6 @@ impl Conversations<'_> { Ok(conversations) } - pub async fn delete( - &mut self, - conversation: &History, - deleted: &Instant, - ) -> Result { - let id = conversation.id(); - sqlx::query!( - r#" - update conversation - set last_sequence = max(last_sequence, $1) - where id = $2 - returning id as "id: Id" - "#, - deleted.sequence, - id, - ) - .fetch_one(&mut *self.0) - .await?; - - sqlx::query!( - r#" - insert into conversation_deleted (id, deleted_at, deleted_sequence) - values ($1, $2, $3) - "#, - id, - deleted.at, - deleted.sequence, - ) - .execute(&mut *self.0) - .await?; - - // Small social responsibility hack here: when a conversation is deleted, its - // name is retconned to have been the empty string. Someone reading the event - // stream afterwards, or looking at conversations via the API, cannot retrieve - // the "deleted" conversation's information by ignoring the deletion event. - sqlx::query!( - r#" - delete from conversation_name - where id = $1 - "#, - id, - ) - .execute(&mut *self.0) - .await?; - - let conversation = self.by_id(id).await?; - - Ok(conversation) - } - pub async fn purge(&mut self, purge_at: &DateTime) -> Result<(), sqlx::Error> { let conversations = sqlx::query_scalar!( r#" -- cgit v1.2.3 From 1e0493f079d011df56fe2ec93c44a0fea38f0531 Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Tue, 26 Aug 2025 03:17:02 -0400 Subject: Store `Message` instances using their events. I found a test bug! The tests for deleting previously-deleted or previously-expired tests were using the wrong user to try to delete those messages. The tests happened to pass anyways because the message authorship check was done after the message lifecycle check. They would have no longer passed; the tests are fixed to use the sender, instead. --- ...d980b7f8cbc8a8339377814ca5f889860da99b1561.json | 50 ------- ...2d5ee0e8b184e5269874b17715e2fa2f1f520bd83f.json | 12 ++ ...b5acc118678830ae380dbed94542e85e294b3d9ace.json | 12 ++ ...3a7c0c7e8cf92e5ee6e3900fe9f94c75b5f8c9bfc4.json | 20 --- src/conversation/app.rs | 11 +- src/message/app.rs | 72 +++++----- src/message/handlers/delete/mod.rs | 7 +- src/message/handlers/delete/test.rs | 18 +-- src/message/history.rs | 70 ++++++++-- src/message/repo.rs | 148 ++++++++++----------- 10 files changed, 208 insertions(+), 212 deletions(-) delete mode 100644 .sqlx/query-427a530f68282ba586c1e2d980b7f8cbc8a8339377814ca5f889860da99b1561.json create mode 100644 .sqlx/query-47bf3a66c20225f28c6c7f2d5ee0e8b184e5269874b17715e2fa2f1f520bd83f.json create mode 100644 .sqlx/query-53c5c19f2e284b45b50fe3b5acc118678830ae380dbed94542e85e294b3d9ace.json delete mode 100644 .sqlx/query-64fb9bad4505c144578e393a7c0c7e8cf92e5ee6e3900fe9f94c75b5f8c9bfc4.json (limited to 'src') diff --git a/.sqlx/query-427a530f68282ba586c1e2d980b7f8cbc8a8339377814ca5f889860da99b1561.json b/.sqlx/query-427a530f68282ba586c1e2d980b7f8cbc8a8339377814ca5f889860da99b1561.json deleted file mode 100644 index 82db559..0000000 --- a/.sqlx/query-427a530f68282ba586c1e2d980b7f8cbc8a8339377814ca5f889860da99b1561.json +++ /dev/null @@ -1,50 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n insert into message\n (id, conversation, sender, sent_at, sent_sequence, body, last_sequence)\n values ($1, $2, $3, $4, $5, $6, $7)\n returning\n id as \"id: Id\",\n conversation as \"conversation: conversation::Id\",\n sender as \"sender: user::Id\",\n sent_at as \"sent_at: DateTime\",\n sent_sequence as \"sent_sequence: Sequence\",\n body as \"body: Body\"\n ", - "describe": { - "columns": [ - { - "name": "id: Id", - "ordinal": 0, - "type_info": "Text" - }, - { - "name": "conversation: conversation::Id", - "ordinal": 1, - "type_info": "Text" - }, - { - "name": "sender: user::Id", - "ordinal": 2, - "type_info": "Text" - }, - { - "name": "sent_at: DateTime", - "ordinal": 3, - "type_info": "Text" - }, - { - "name": "sent_sequence: Sequence", - "ordinal": 4, - "type_info": "Integer" - }, - { - "name": "body: Body", - "ordinal": 5, - "type_info": "Text" - } - ], - "parameters": { - "Right": 7 - }, - "nullable": [ - false, - false, - false, - false, - false, - true - ] - }, - "hash": "427a530f68282ba586c1e2d980b7f8cbc8a8339377814ca5f889860da99b1561" -} diff --git a/.sqlx/query-47bf3a66c20225f28c6c7f2d5ee0e8b184e5269874b17715e2fa2f1f520bd83f.json b/.sqlx/query-47bf3a66c20225f28c6c7f2d5ee0e8b184e5269874b17715e2fa2f1f520bd83f.json new file mode 100644 index 0000000..ce70fcd --- /dev/null +++ b/.sqlx/query-47bf3a66c20225f28c6c7f2d5ee0e8b184e5269874b17715e2fa2f1f520bd83f.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "\n update message\n set body = '', last_sequence = max(last_sequence, $1)\n where id = $2\n ", + "describe": { + "columns": [], + "parameters": { + "Right": 2 + }, + "nullable": [] + }, + "hash": "47bf3a66c20225f28c6c7f2d5ee0e8b184e5269874b17715e2fa2f1f520bd83f" +} diff --git a/.sqlx/query-53c5c19f2e284b45b50fe3b5acc118678830ae380dbed94542e85e294b3d9ace.json b/.sqlx/query-53c5c19f2e284b45b50fe3b5acc118678830ae380dbed94542e85e294b3d9ace.json new file mode 100644 index 0000000..85e43c8 --- /dev/null +++ b/.sqlx/query-53c5c19f2e284b45b50fe3b5acc118678830ae380dbed94542e85e294b3d9ace.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "\n insert into message\n (id, conversation, sender, body, sent_at, sent_sequence, last_sequence)\n values ($1, $2, $3, $4, $5, $6, $6)\n ", + "describe": { + "columns": [], + "parameters": { + "Right": 6 + }, + "nullable": [] + }, + "hash": "53c5c19f2e284b45b50fe3b5acc118678830ae380dbed94542e85e294b3d9ace" +} diff --git a/.sqlx/query-64fb9bad4505c144578e393a7c0c7e8cf92e5ee6e3900fe9f94c75b5f8c9bfc4.json b/.sqlx/query-64fb9bad4505c144578e393a7c0c7e8cf92e5ee6e3900fe9f94c75b5f8c9bfc4.json deleted file mode 100644 index 5179e74..0000000 --- a/.sqlx/query-64fb9bad4505c144578e393a7c0c7e8cf92e5ee6e3900fe9f94c75b5f8c9bfc4.json +++ /dev/null @@ -1,20 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n update message\n set body = '', last_sequence = max(last_sequence, $1)\n where id = $2\n returning id as \"id: Id\"\n ", - "describe": { - "columns": [ - { - "name": "id: Id", - "ordinal": 0, - "type_info": "Text" - } - ], - "parameters": { - "Right": 2 - }, - "nullable": [ - false - ] - }, - "hash": "64fb9bad4505c144578e393a7c0c7e8cf92e5ee6e3900fe9f94c75b5f8c9bfc4" -} diff --git a/src/conversation/app.rs b/src/conversation/app.rs index 30baf77..5e07292 100644 --- a/src/conversation/app.rs +++ b/src/conversation/app.rs @@ -86,18 +86,21 @@ impl<'a> Conversations<'a> { .not_found(|| DeleteError::NotFound(conversation.clone()))?; let messages = tx.messages().live(&conversation).await?; + let deleted_at = tx.sequence().next(deleted_at).await?; + let has_messages = messages .iter() - .map(message::History::as_snapshot) + .map(|message| message.as_of(deleted_at)) .any(|message| message.is_some()); if has_messages { return Err(DeleteError::NotEmpty(conversation.id().clone())); } - let deleted = tx.sequence().next(deleted_at).await?; - let conversation = conversation.delete(deleted)?; + let conversation = conversation.delete(deleted_at)?; - let events = conversation.events().filter(Sequence::start_from(deleted)); + let events = conversation + .events() + .filter(Sequence::start_from(deleted_at)); tx.conversations().record_events(events.clone()).await?; tx.commit().await?; diff --git a/src/message/app.rs b/src/message/app.rs index 9100224..f0a62d0 100644 --- a/src/message/app.rs +++ b/src/message/app.rs @@ -2,7 +2,7 @@ use chrono::TimeDelta; use itertools::Itertools; use sqlx::sqlite::SqlitePool; -use super::{Body, Id, Message, repo::Provider as _}; +use super::{Body, History, Id, Message, history, repo::Provider as _}; use crate::{ clock::DateTime, conversation::{self, repo::Provider as _}, @@ -52,14 +52,18 @@ impl<'a> Messages<'a> { let sent = tx.sequence().next(sent_at).await?; let conversation = conversation.as_of(sent).ok_or_else(conversation_deleted)?; let sender = sender.as_of(sent).ok_or_else(sender_deleted)?; - let message = tx - .messages() - .create(&conversation, &sender, &sent, body) - .await?; + let message = History::begin(&conversation, &sender, body, sent); + + // This filter technically includes every event in the history, but it's easier to follow if + // the various event-manipulating app methods are consistent, and it's harmless to have an + // always-satisfied filter. + let events = message.events().filter(Sequence::start_from(sent)); + tx.messages().record_events(events.clone()).await?; + tx.commit().await?; self.events - .broadcast(message.events().map(Event::from).collect::>()); + .broadcast(events.map(Event::from).collect::>()); Ok(message.as_sent()) } @@ -71,38 +75,25 @@ impl<'a> Messages<'a> { deleted_at: &DateTime, ) -> Result<(), DeleteError> { let message_not_found = || DeleteError::MessageNotFound(message.clone()); - let message_deleted = || DeleteError::Deleted(message.clone()); - let deleter_not_found = || DeleteError::UserNotFound(deleted_by.id.clone().into()); - let deleter_deleted = || DeleteError::UserDeleted(deleted_by.id.clone().into()); let not_sender = || DeleteError::NotSender(deleted_by.id.clone().into()); let mut tx = self.db.begin().await?; + let message = tx .messages() .by_id(message) .await .not_found(message_not_found)?; - let deleted_by = tx - .users() - .by_login(deleted_by) - .await - .not_found(deleter_not_found)?; - - let deleted = tx.sequence().next(deleted_at).await?; - let message = message.as_of(deleted).ok_or_else(message_deleted)?; - let deleted_by = deleted_by.as_of(deleted).ok_or_else(deleter_deleted)?; + if message.sender() == &deleted_by.id { + let deleted_at = tx.sequence().next(deleted_at).await?; + let message = message.delete(deleted_at)?; - if message.sender == deleted_by.id { - let message = tx.messages().delete(&message, &deleted).await?; + let events = message.events().filter(Sequence::start_from(deleted_at)); + tx.messages().record_events(events.clone()).await?; tx.commit().await?; - self.events.broadcast( - message - .events() - .filter(Sequence::start_from(deleted.sequence)) - .map(Event::from) - .collect::>(), - ); + self.events + .broadcast(events.map(Event::from).collect::>()); Ok(()) } else { @@ -120,13 +111,16 @@ impl<'a> Messages<'a> { let mut events = Vec::with_capacity(expired.len()); for message in expired { let deleted = tx.sequence().next(relative_to).await?; - if let Some(message) = message.as_of(deleted) { - let message = tx.messages().delete(&message, &deleted).await?; - events.push( - message + match message.delete(deleted) { + Ok(message) => { + let message_events = message .events() - .filter(Sequence::start_from(deleted.sequence)), - ); + .filter(Sequence::start_from(deleted.sequence)); + tx.messages().record_events(message_events.clone()).await?; + + events.push(message_events); + } + Err(history::DeleteError::Deleted(_)) => {} } } @@ -195,10 +189,6 @@ impl From for SendError { pub enum DeleteError { #[error("message {0} not found")] MessageNotFound(Id), - #[error("user {0} not found")] - UserNotFound(user::Id), - #[error("user {0} deleted")] - UserDeleted(user::Id), #[error("user {0} not the message's sender")] NotSender(user::Id), #[error("message {0} deleted")] @@ -218,3 +208,11 @@ impl From for DeleteError { } } } + +impl From for DeleteError { + fn from(error: history::DeleteError) -> Self { + match error { + history::DeleteError::Deleted(message) => Self::Deleted(message.id().clone()), + } + } +} diff --git a/src/message/handlers/delete/mod.rs b/src/message/handlers/delete/mod.rs index 606f502..3e9a212 100644 --- a/src/message/handlers/delete/mod.rs +++ b/src/message/handlers/delete/mod.rs @@ -51,10 +51,9 @@ impl IntoResponse for Error { DeleteError::MessageNotFound(_) | DeleteError::Deleted(_) => { NotFound(error).into_response() } - DeleteError::UserNotFound(_) - | DeleteError::UserDeleted(_) - | DeleteError::Database(_) - | DeleteError::Name(_) => Internal::from(error).into_response(), + DeleteError::Database(_) | DeleteError::Name(_) => { + Internal::from(error).into_response() + } } } } diff --git a/src/message/handlers/delete/test.rs b/src/message/handlers/delete/test.rs index d0e1794..05d9344 100644 --- a/src/message/handlers/delete/test.rs +++ b/src/message/handlers/delete/test.rs @@ -70,23 +70,23 @@ pub async fn delete_deleted() { // Set up the environment let app = fixtures::scratch_app().await; - let sender = fixtures::user::create(&app, &fixtures::now()).await; + let sender = fixtures::identity::create(&app, &fixtures::now()).await; let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; - let message = fixtures::message::send(&app, &conversation, &sender, &fixtures::now()).await; + let message = + fixtures::message::send(&app, &conversation, &sender.login, &fixtures::now()).await; app.messages() - .delete(&sender, &message.id, &fixtures::now()) + .delete(&sender.login, &message.id, &fixtures::now()) .await .expect("deleting a recently-sent message succeeds"); // Send the request - let deleter = fixtures::identity::create(&app, &fixtures::now()).await; let super::Error(error) = super::handler( State(app.clone()), Path(message.id.clone()), fixtures::now(), - deleter, + sender, ) .await .expect_err("deleting a deleted message fails"); @@ -101,9 +101,10 @@ pub async fn delete_expired() { // Set up the environment let app = fixtures::scratch_app().await; - let sender = fixtures::user::create(&app, &fixtures::ancient()).await; + let sender = fixtures::identity::create(&app, &fixtures::ancient()).await; let conversation = fixtures::conversation::create(&app, &fixtures::ancient()).await; - let message = fixtures::message::send(&app, &conversation, &sender, &fixtures::ancient()).await; + let message = + fixtures::message::send(&app, &conversation, &sender.login, &fixtures::ancient()).await; app.messages() .expire(&fixtures::now()) @@ -112,12 +113,11 @@ pub async fn delete_expired() { // Send the request - let deleter = fixtures::identity::create(&app, &fixtures::now()).await; let super::Error(error) = super::handler( State(app.clone()), Path(message.id.clone()), fixtures::now(), - deleter, + sender, ) .await .expect_err("deleting an expired message fails"); diff --git a/src/message/history.rs b/src/message/history.rs index 2abdf2c..92cecc9 100644 --- a/src/message/history.rs +++ b/src/message/history.rs @@ -1,18 +1,67 @@ use itertools::Itertools as _; use super::{ - Message, + Body, Id, Message, event::{Deleted, Event, Sent}, }; -use crate::event::Sequence; +use crate::{ + conversation::Conversation, + event::{Instant, Sequence}, + user::{self, User}, +}; #[derive(Clone, Debug, Eq, PartialEq)] pub struct History { pub message: Message, } +// Lifecycle interface +impl History { + pub fn begin(conversation: &Conversation, sender: &User, body: &Body, sent: Instant) -> Self { + Self { + message: Message { + id: Id::generate(), + conversation: conversation.id.clone(), + sender: sender.id.clone(), + body: body.clone(), + sent, + deleted: None, + }, + } + } + + pub fn delete(self, deleted: Instant) -> Result { + if self.message.deleted.is_none() { + Ok(Self { + message: Message { + deleted: Some(deleted), + ..self.message + }, + }) + } else { + Err(DeleteError::Deleted(self.into())) + } + } +} + +#[derive(Debug, thiserror::Error)] +pub enum DeleteError { + #[error("message {} already deleted", .0.message.id)] + // Payload is boxed here to avoid copying an entire `History` around in any errors this error + // gets chained into. See . + Deleted(Box), +} + // State interface impl History { + pub fn id(&self) -> &Id { + &self.message.id + } + + pub fn sender(&self) -> &user::Id { + &self.message.sender + } + // Snapshot of this message as it was when sent. (Note to the future: it's okay // if this returns a redacted or modified version of the message. If we // implement message editing by redacting the original body, then this should @@ -30,15 +79,16 @@ impl History { .filter(Sequence::up_to(sequence.into())) .collect() } - - // Snapshot of this message as of all events recorded in this history. - pub fn as_snapshot(&self) -> Option { - self.events().collect() - } } // Events interface impl History { + pub fn events(&self) -> impl Iterator + Clone + use<> { + [self.sent()] + .into_iter() + .merge_by(self.deleted(), Sequence::merge) + } + fn sent(&self) -> Event { Sent { message: self.message.clone(), @@ -55,10 +105,4 @@ impl History { .into() }) } - - pub fn events(&self) -> impl Iterator + use<> { - [self.sent()] - .into_iter() - .merge_by(self.deleted(), Sequence::merge) - } } diff --git a/src/message/repo.rs b/src/message/repo.rs index 83bf0d5..4f66bdc 100644 --- a/src/message/repo.rs +++ b/src/message/repo.rs @@ -1,11 +1,14 @@ use sqlx::{SqliteConnection, Transaction, sqlite::Sqlite}; -use super::{Body, History, Id, snapshot::Message}; +use super::{ + Body, Event, History, Id, Message, + event::{Deleted, Sent}, +}; use crate::{ clock::DateTime, - conversation::{self, Conversation}, + conversation, event::{Instant, Sequence}, - user::{self, User}, + user, }; pub trait Provider { @@ -21,50 +24,84 @@ impl Provider for Transaction<'_, Sqlite> { pub struct Messages<'t>(&'t mut SqliteConnection); impl Messages<'_> { - pub async fn create( + pub async fn record_events( &mut self, - conversation: &Conversation, - sender: &User, - sent: &Instant, - body: &Body, - ) -> Result { - let id = Id::generate(); + events: impl IntoIterator, + ) -> Result<(), sqlx::Error> { + for event in events { + self.record_event(&event).await?; + } + Ok(()) + } - let message = sqlx::query!( + pub async fn record_event(&mut self, event: &Event) -> Result<(), sqlx::Error> { + match event { + Event::Sent(sent) => self.record_sent(sent).await, + Event::Deleted(deleted) => self.record_deleted(deleted).await, + } + } + + async fn record_sent(&mut self, sent: &Sent) -> Result<(), sqlx::Error> { + let Message { + id, + conversation, + sender, + body, + sent, + deleted: _, + } = &sent.message; + + sqlx::query!( r#" insert into message - (id, conversation, sender, sent_at, sent_sequence, body, last_sequence) - values ($1, $2, $3, $4, $5, $6, $7) - returning - id as "id: Id", - conversation as "conversation: conversation::Id", - sender as "sender: user::Id", - sent_at as "sent_at: DateTime", - sent_sequence as "sent_sequence: Sequence", - body as "body: Body" + (id, conversation, sender, body, sent_at, sent_sequence, last_sequence) + values ($1, $2, $3, $4, $5, $6, $6) "#, id, - conversation.id, - sender.id, - sent.at, - sent.sequence, + conversation, + sender, body, + sent.at, sent.sequence, ) - .map(|row| History { - message: Message { - sent: Instant::new(row.sent_at, row.sent_sequence), - conversation: row.conversation, - sender: row.sender, - id: row.id, - body: row.body.unwrap_or_default(), - deleted: None, - }, - }) - .fetch_one(&mut *self.0) + .execute(&mut *self.0) .await?; - Ok(message) + Ok(()) + } + + async fn record_deleted(&mut self, deleted: &Deleted) -> Result<(), sqlx::Error> { + let Deleted { instant, id } = deleted; + + sqlx::query!( + r#" + insert into message_deleted (id, deleted_at, deleted_sequence) + values ($1, $2, $3) + "#, + id, + instant.at, + instant.sequence, + ) + .execute(&mut *self.0) + .await?; + + // Small social responsibility hack here: when a message is deleted, its body is + // retconned to have been the empty string. Someone reading the event stream + // afterwards, or looking at messages in the conversation, cannot retrieve the + // "deleted" message by ignoring the deletion event. + sqlx::query!( + r#" + update message + set body = '', last_sequence = max(last_sequence, $1) + where id = $2 + "#, + instant.sequence, + id, + ) + .execute(&mut *self.0) + .await?; + + Ok(()) } pub async fn live( @@ -178,45 +215,6 @@ impl Messages<'_> { Ok(message) } - pub async fn delete( - &mut self, - message: &Message, - deleted: &Instant, - ) -> Result { - sqlx::query!( - r#" - insert into message_deleted (id, deleted_at, deleted_sequence) - values ($1, $2, $3) - "#, - message.id, - deleted.at, - deleted.sequence, - ) - .execute(&mut *self.0) - .await?; - - // Small social responsibility hack here: when a message is deleted, its body is - // retconned to have been the empty string. Someone reading the event stream - // afterwards, or looking at messages in the conversation, cannot retrieve the - // "deleted" message by ignoring the deletion event. - sqlx::query!( - r#" - update message - set body = '', last_sequence = max(last_sequence, $1) - where id = $2 - returning id as "id: Id" - "#, - deleted.sequence, - message.id, - ) - .fetch_one(&mut *self.0) - .await?; - - let message = self.by_id(&message.id).await?; - - Ok(message) - } - pub async fn purge(&mut self, purge_at: &DateTime) -> Result<(), sqlx::Error> { let messages = sqlx::query_scalar!( r#" -- cgit v1.2.3 From a2fee1c18d9def1486a570fb3c98db5372c51238 Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Tue, 26 Aug 2025 18:18:24 -0400 Subject: Store `User` instances using their events. --- ...81f11f2217c3dcfe768b08c8733d3c9a00e9c0c7a7.json | 12 ++++++ ...dd2926baf08b119ec169fc6a3d8b507b9701526e69.json | 44 +++++++++++++++++++ ...7a6bbb9dfe26bce0812a215573a276043095cd872c.json | 44 ------------------- ...b58a7e1dbd65c61badd6a9f8c4e01c12b6c2a3f3b6.json | 44 +++++++++++++++++++ ...2b07cc6e9f69c64bb790d6c52ad84872f256c749aa.json | 44 ------------------- ...0475c25112ffb2b26584ada60c324fc2f945c3d2fa.json | 12 ------ src/user/create.rs | 39 +++++++++++------ src/user/history.rs | 22 +++++++++- src/user/repo.rs | 50 +++++++++++++--------- 9 files changed, 176 insertions(+), 135 deletions(-) create mode 100644 .sqlx/query-1feaf96621fff37a456dcd81f11f2217c3dcfe768b08c8733d3c9a00e9c0c7a7.json create mode 100644 .sqlx/query-2fefbfc13bbc92fd69a1e2dd2926baf08b119ec169fc6a3d8b507b9701526e69.json delete mode 100644 .sqlx/query-8dae7dbe085898659013167a6bbb9dfe26bce0812a215573a276043095cd872c.json create mode 100644 .sqlx/query-9e220610e6e22f4dc5a2b9b58a7e1dbd65c61badd6a9f8c4e01c12b6c2a3f3b6.json delete mode 100644 .sqlx/query-be644101e1fd50880fa7c82b07cc6e9f69c64bb790d6c52ad84872f256c749aa.json delete mode 100644 .sqlx/query-f9a6a39c45c3b039f139da0475c25112ffb2b26584ada60c324fc2f945c3d2fa.json (limited to 'src') diff --git a/.sqlx/query-1feaf96621fff37a456dcd81f11f2217c3dcfe768b08c8733d3c9a00e9c0c7a7.json b/.sqlx/query-1feaf96621fff37a456dcd81f11f2217c3dcfe768b08c8733d3c9a00e9c0c7a7.json new file mode 100644 index 0000000..69c2e47 --- /dev/null +++ b/.sqlx/query-1feaf96621fff37a456dcd81f11f2217c3dcfe768b08c8733d3c9a00e9c0c7a7.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "\n insert\n into user (id, created_at, created_sequence)\n values ($1, $2, $3)\n ", + "describe": { + "columns": [], + "parameters": { + "Right": 3 + }, + "nullable": [] + }, + "hash": "1feaf96621fff37a456dcd81f11f2217c3dcfe768b08c8733d3c9a00e9c0c7a7" +} diff --git a/.sqlx/query-2fefbfc13bbc92fd69a1e2dd2926baf08b119ec169fc6a3d8b507b9701526e69.json b/.sqlx/query-2fefbfc13bbc92fd69a1e2dd2926baf08b119ec169fc6a3d8b507b9701526e69.json new file mode 100644 index 0000000..0849dbc --- /dev/null +++ b/.sqlx/query-2fefbfc13bbc92fd69a1e2dd2926baf08b119ec169fc6a3d8b507b9701526e69.json @@ -0,0 +1,44 @@ +{ + "db_name": "SQLite", + "query": "\n select\n id as \"id: Id\",\n login.display_name as \"display_name: String\",\n login.canonical_name as \"canonical_name: String\",\n user.created_at as \"created_at: DateTime\",\n user.created_sequence as \"created_sequence: Sequence\"\n from user\n join login using (id)\n where user.created_sequence > $1\n ", + "describe": { + "columns": [ + { + "name": "id: Id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "display_name: String", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "canonical_name: String", + "ordinal": 2, + "type_info": "Text" + }, + { + "name": "created_at: DateTime", + "ordinal": 3, + "type_info": "Text" + }, + { + "name": "created_sequence: Sequence", + "ordinal": 4, + "type_info": "Integer" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false, + false, + false, + false, + false + ] + }, + "hash": "2fefbfc13bbc92fd69a1e2dd2926baf08b119ec169fc6a3d8b507b9701526e69" +} diff --git a/.sqlx/query-8dae7dbe085898659013167a6bbb9dfe26bce0812a215573a276043095cd872c.json b/.sqlx/query-8dae7dbe085898659013167a6bbb9dfe26bce0812a215573a276043095cd872c.json deleted file mode 100644 index cbe1cdf..0000000 --- a/.sqlx/query-8dae7dbe085898659013167a6bbb9dfe26bce0812a215573a276043095cd872c.json +++ /dev/null @@ -1,44 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n select\n id as \"id: Id\",\n login.display_name as \"display_name: String\",\n login.canonical_name as \"canonical_name: String\",\n user.created_sequence as \"created_sequence: Sequence\",\n user.created_at as \"created_at: DateTime\"\n from user\n join login using (id)\n where user.created_sequence > $1\n ", - "describe": { - "columns": [ - { - "name": "id: Id", - "ordinal": 0, - "type_info": "Text" - }, - { - "name": "display_name: String", - "ordinal": 1, - "type_info": "Text" - }, - { - "name": "canonical_name: String", - "ordinal": 2, - "type_info": "Text" - }, - { - "name": "created_sequence: Sequence", - "ordinal": 3, - "type_info": "Integer" - }, - { - "name": "created_at: DateTime", - "ordinal": 4, - "type_info": "Text" - } - ], - "parameters": { - "Right": 1 - }, - "nullable": [ - false, - false, - false, - false, - false - ] - }, - "hash": "8dae7dbe085898659013167a6bbb9dfe26bce0812a215573a276043095cd872c" -} diff --git a/.sqlx/query-9e220610e6e22f4dc5a2b9b58a7e1dbd65c61badd6a9f8c4e01c12b6c2a3f3b6.json b/.sqlx/query-9e220610e6e22f4dc5a2b9b58a7e1dbd65c61badd6a9f8c4e01c12b6c2a3f3b6.json new file mode 100644 index 0000000..6dfc7ef --- /dev/null +++ b/.sqlx/query-9e220610e6e22f4dc5a2b9b58a7e1dbd65c61badd6a9f8c4e01c12b6c2a3f3b6.json @@ -0,0 +1,44 @@ +{ + "db_name": "SQLite", + "query": "\n select\n id as \"id: Id\",\n login.display_name as \"display_name: String\",\n login.canonical_name as \"canonical_name: String\",\n user.created_at as \"created_at: DateTime\",\n user.created_sequence as \"created_sequence: Sequence\"\n from user\n join login using (id)\n where user.created_sequence <= $1\n ", + "describe": { + "columns": [ + { + "name": "id: Id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "display_name: String", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "canonical_name: String", + "ordinal": 2, + "type_info": "Text" + }, + { + "name": "created_at: DateTime", + "ordinal": 3, + "type_info": "Text" + }, + { + "name": "created_sequence: Sequence", + "ordinal": 4, + "type_info": "Integer" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false, + false, + false, + false, + false + ] + }, + "hash": "9e220610e6e22f4dc5a2b9b58a7e1dbd65c61badd6a9f8c4e01c12b6c2a3f3b6" +} diff --git a/.sqlx/query-be644101e1fd50880fa7c82b07cc6e9f69c64bb790d6c52ad84872f256c749aa.json b/.sqlx/query-be644101e1fd50880fa7c82b07cc6e9f69c64bb790d6c52ad84872f256c749aa.json deleted file mode 100644 index e56faa9..0000000 --- a/.sqlx/query-be644101e1fd50880fa7c82b07cc6e9f69c64bb790d6c52ad84872f256c749aa.json +++ /dev/null @@ -1,44 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n select\n id as \"id: Id\",\n login.display_name as \"display_name: String\",\n login.canonical_name as \"canonical_name: String\",\n user.created_sequence as \"created_sequence: Sequence\",\n user.created_at as \"created_at: DateTime\"\n from user\n join login using (id)\n where user.created_sequence <= $1\n order by canonical_name\n ", - "describe": { - "columns": [ - { - "name": "id: Id", - "ordinal": 0, - "type_info": "Text" - }, - { - "name": "display_name: String", - "ordinal": 1, - "type_info": "Text" - }, - { - "name": "canonical_name: String", - "ordinal": 2, - "type_info": "Text" - }, - { - "name": "created_sequence: Sequence", - "ordinal": 3, - "type_info": "Integer" - }, - { - "name": "created_at: DateTime", - "ordinal": 4, - "type_info": "Text" - } - ], - "parameters": { - "Right": 1 - }, - "nullable": [ - false, - false, - false, - false, - false - ] - }, - "hash": "be644101e1fd50880fa7c82b07cc6e9f69c64bb790d6c52ad84872f256c749aa" -} diff --git a/.sqlx/query-f9a6a39c45c3b039f139da0475c25112ffb2b26584ada60c324fc2f945c3d2fa.json b/.sqlx/query-f9a6a39c45c3b039f139da0475c25112ffb2b26584ada60c324fc2f945c3d2fa.json deleted file mode 100644 index 6050e22..0000000 --- a/.sqlx/query-f9a6a39c45c3b039f139da0475c25112ffb2b26584ada60c324fc2f945c3d2fa.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n insert into user (id, created_sequence, created_at)\n values ($1, $2, $3)\n ", - "describe": { - "columns": [], - "parameters": { - "Right": 3 - }, - "nullable": [] - }, - "hash": "f9a6a39c45c3b039f139da0475c25112ffb2b26584ada60c324fc2f945c3d2fa" -} diff --git a/src/user/create.rs b/src/user/create.rs index 5c060c9..21c61d1 100644 --- a/src/user/create.rs +++ b/src/user/create.rs @@ -3,7 +3,7 @@ use sqlx::{Transaction, sqlite::Sqlite}; use super::{History, repo::Provider as _, validate}; use crate::{ clock::DateTime, - event::{Broadcaster, Event, repo::Provider as _}, + event::{Broadcaster, Event, Sequence, repo::Provider as _}, login::{self, Login, repo::Provider as _}, name::Name, password::{Password, StoredHash}, @@ -54,7 +54,10 @@ pub struct Validated<'a> { } impl Validated<'_> { - pub async fn store(self, tx: &mut Transaction<'_, Sqlite>) -> Result { + pub async fn store( + self, + tx: &mut Transaction<'_, Sqlite>, + ) -> Result + use<>>, sqlx::Error> { let Self { name, password, @@ -63,28 +66,40 @@ impl Validated<'_> { let login = Login { id: login::Id::generate(), - name: name.to_owned(), + name: name.clone(), }; + tx.logins().create(&login, &password).await?; let created = tx.sequence().next(created_at).await?; - tx.logins().create(&login, &password).await?; - let user = tx.users().create(&login, &created).await?; + let user = History::begin(&login, created); + + let events = user.events().filter(Sequence::start_from(created)); + tx.users().record_events(events.clone()).await?; - Ok(Stored { user, login }) + Ok(Stored { + events: events.map(Event::from), + login, + }) } } #[must_use = "dropping a user creation attempt is likely a mistake"] -pub struct Stored { - user: History, +pub struct Stored { + events: E, login: Login, } -impl Stored { - pub fn publish(self, broadcaster: &Broadcaster) { - let Self { user, login: _ } = self; +impl Stored +where + E: IntoIterator, +{ + pub fn publish(self, events: &Broadcaster) { + let Self { + events: user_events, + login: _, + } = self; - broadcaster.broadcast(user.events().map(Event::from).collect::>()); + events.broadcast(user_events.into_iter().collect::>()); } pub fn login(&self) -> &Login { diff --git a/src/user/history.rs b/src/user/history.rs index f58e9c7..7c06a2d 100644 --- a/src/user/history.rs +++ b/src/user/history.rs @@ -2,7 +2,10 @@ use super::{ User, event::{Created, Event}, }; -use crate::event::{Instant, Sequence}; +use crate::{ + event::{Instant, Sequence}, + login::Login, +}; #[derive(Clone, Debug, Eq, PartialEq)] pub struct History { @@ -10,6 +13,21 @@ pub struct History { pub created: Instant, } +// Lifecycle interface +impl History { + pub fn begin(login: &Login, created: Instant) -> Self { + let Login { id, name } = login.clone(); + + Self { + user: User { + id: id.into(), + name, + }, + created, + } + } +} + // State interface impl History { pub fn as_of(&self, sequence: S) -> Option @@ -32,7 +50,7 @@ impl History { .into() } - pub fn events(&self) -> impl Iterator + use<> { + pub fn events(&self) -> impl Iterator + Clone + use<> { [self.created()].into_iter() } } diff --git a/src/user/repo.rs b/src/user/repo.rs index aaf3b73..292d72e 100644 --- a/src/user/repo.rs +++ b/src/user/repo.rs @@ -1,13 +1,13 @@ use futures::stream::{StreamExt as _, TryStreamExt as _}; use sqlx::{SqliteConnection, Transaction, sqlite::Sqlite}; +use super::{Event, History, Id, User, event::Created}; use crate::{ clock::DateTime, db::NotFound, event::{Instant, Sequence}, login::Login, name::{self, Name}, - user::{History, Id, User}, }; pub trait Provider { @@ -23,30 +23,39 @@ impl Provider for Transaction<'_, Sqlite> { pub struct Users<'t>(&'t mut SqliteConnection); impl Users<'_> { - pub async fn create( + pub async fn record_events( &mut self, - login: &Login, - created: &Instant, - ) -> Result { + events: impl IntoIterator, + ) -> Result<(), sqlx::Error> { + for event in events { + self.record_event(&event).await?; + } + Ok(()) + } + + pub async fn record_event(&mut self, event: &Event) -> Result<(), sqlx::Error> { + match event { + Event::Created(created) => self.record_created(created).await, + } + } + + async fn record_created(&mut self, created: &Created) -> Result<(), sqlx::Error> { + let Created { user, instant } = created; + sqlx::query!( r#" - insert into user (id, created_sequence, created_at) + insert + into user (id, created_at, created_sequence) values ($1, $2, $3) "#, - login.id, - created.sequence, - created.at, + user.id, + instant.at, + instant.sequence, ) .execute(&mut *self.0) .await?; - Ok(History { - user: User { - id: login.id.clone().into(), - name: login.name.clone(), - }, - created: *created, - }) + Ok(()) } pub async fn by_login(&mut self, login: &Login) -> Result { @@ -86,12 +95,11 @@ impl Users<'_> { id as "id: Id", login.display_name as "display_name: String", login.canonical_name as "canonical_name: String", - user.created_sequence as "created_sequence: Sequence", - user.created_at as "created_at: DateTime" + user.created_at as "created_at: DateTime", + user.created_sequence as "created_sequence: Sequence" from user join login using (id) where user.created_sequence <= $1 - order by canonical_name "#, resume_at, ) @@ -119,8 +127,8 @@ impl Users<'_> { id as "id: Id", login.display_name as "display_name: String", login.canonical_name as "canonical_name: String", - user.created_sequence as "created_sequence: Sequence", - user.created_at as "created_at: DateTime" + user.created_at as "created_at: DateTime", + user.created_sequence as "created_sequence: Sequence" from user join login using (id) where user.created_sequence > $1 -- cgit v1.2.3 From 17c38585fc2623a6b0196146cf6b6df9955ce979 Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Tue, 26 Aug 2025 18:35:54 -0400 Subject: Consolidate `events.map(…).collect()` calls into `Broadcaster`. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This conversion, from an iterator of type-specific events (say, `user::Event` or `message::Event`), into a `Vec`, is prevasive, and it needs to be done each time. Having Broadcaster expose a support method for this cuts down on the repetition, at the cost of a slightly alarming amount of type-system nonsense in `broadcast_from`. Historical footnote: the internal message structure is a Vec and not an individual message so that bulk operations, like expiring channels and messages, won't disconnect everyone if they happen to dispatch more than sixteen messages (current queue depth limit) at once. We trade allocation and memory pressure for keeping the connections alive. _Most_ event publishing is an iterator of one item, so the Vec allocation is redundant. --- src/broadcast.rs | 22 ++++++++++++++++++++-- src/conversation/app.rs | 17 +++++------------ src/login/app.rs | 7 ++++--- src/message/app.rs | 17 +++++------------ src/token/app.rs | 7 ++++--- src/user/create.rs | 2 +- 6 files changed, 39 insertions(+), 33 deletions(-) (limited to 'src') diff --git a/src/broadcast.rs b/src/broadcast.rs index 6e1f04d..ee42c08 100644 --- a/src/broadcast.rs +++ b/src/broadcast.rs @@ -30,7 +30,7 @@ impl Broadcaster where M: Clone + Send + std::fmt::Debug + 'static, { - pub fn broadcast(&self, message: impl Into) { + pub fn broadcast(&self, message: M) { let tx = self.sender(); // Per the Tokio docs, the returned error is only used to indicate that @@ -40,7 +40,25 @@ where // // The successful return value, which includes the number of active // receivers, also isn't that interesting to us. - let _ = tx.send(message.into()); + let _ = tx.send(message); + } + + // If `M` is a type that can be obtained from an iterator, such as a `Vec`, and if `I` is an + // iterable of items that can be collected into `M`, then this will construct an `M` from the + // passed event iterator, converting each element as it goes. This emits one message (as `M`), + // containing whatever we collect out of `messages`. + // + // This is mostly meant for handling synchronized entity events, which tend to be generated as + // iterables of domain-specific event types, like `user::Event`, but broadcast as `Vec` + // for consumption by outside clients. + pub fn broadcast_from(&self, messages: I) + where + I: IntoIterator, + M: FromIterator, + E: From, + { + let message = messages.into_iter().map(Into::into).collect(); + self.broadcast(message); } pub fn subscribe(&self) -> impl Stream + std::fmt::Debug + use { diff --git a/src/conversation/app.rs b/src/conversation/app.rs index 5e07292..26886af 100644 --- a/src/conversation/app.rs +++ b/src/conversation/app.rs @@ -10,7 +10,7 @@ use super::{ use crate::{ clock::DateTime, db::{Duplicate as _, NotFound as _}, - event::{Broadcaster, Event, Sequence, repo::Provider as _}, + event::{Broadcaster, Sequence, repo::Provider as _}, message::repo::Provider as _, name::{self, Name}, }; @@ -49,8 +49,7 @@ impl<'a> Conversations<'a> { tx.commit().await?; - self.events - .broadcast(events.map(Event::from).collect::>()); + self.events.broadcast_from(events); Ok(conversation.as_created()) } @@ -105,8 +104,7 @@ impl<'a> Conversations<'a> { tx.commit().await?; - self.events - .broadcast(events.map(Event::from).collect::>()); + self.events.broadcast_from(events); Ok(()) } @@ -134,13 +132,8 @@ impl<'a> Conversations<'a> { tx.commit().await?; - self.events.broadcast( - events - .into_iter() - .kmerge_by(Sequence::merge) - .map(Event::from) - .collect::>(), - ); + self.events + .broadcast_from(events.into_iter().kmerge_by(Sequence::merge)); Ok(()) } diff --git a/src/login/app.rs b/src/login/app.rs index 77d4ac3..e471000 100644 --- a/src/login/app.rs +++ b/src/login/app.rs @@ -80,9 +80,10 @@ impl<'a> Logins<'a> { tx.tokens().create(&token, &secret).await?; tx.commit().await?; - for event in revoked.into_iter().map(TokenEvent::Revoked) { - self.token_events.broadcast(event); - } + revoked + .into_iter() + .map(TokenEvent::Revoked) + .for_each(|event| self.token_events.broadcast(event)); Ok(secret) } else { diff --git a/src/message/app.rs b/src/message/app.rs index f0a62d0..647152e 100644 --- a/src/message/app.rs +++ b/src/message/app.rs @@ -7,7 +7,7 @@ use crate::{ clock::DateTime, conversation::{self, repo::Provider as _}, db::NotFound as _, - event::{Broadcaster, Event, Sequence, repo::Provider as _}, + event::{Broadcaster, Sequence, repo::Provider as _}, login::Login, name, user::{self, repo::Provider as _}, @@ -62,8 +62,7 @@ impl<'a> Messages<'a> { tx.commit().await?; - self.events - .broadcast(events.map(Event::from).collect::>()); + self.events.broadcast_from(events); Ok(message.as_sent()) } @@ -92,8 +91,7 @@ impl<'a> Messages<'a> { tx.messages().record_events(events.clone()).await?; tx.commit().await?; - self.events - .broadcast(events.map(Event::from).collect::>()); + self.events.broadcast_from(events); Ok(()) } else { @@ -126,13 +124,8 @@ impl<'a> Messages<'a> { tx.commit().await?; - self.events.broadcast( - events - .into_iter() - .kmerge_by(Sequence::merge) - .map(Event::from) - .collect::>(), - ); + self.events + .broadcast_from(events.into_iter().kmerge_by(Sequence::merge)); Ok(()) } diff --git a/src/token/app.rs b/src/token/app.rs index fb5d712..1d68f32 100644 --- a/src/token/app.rs +++ b/src/token/app.rs @@ -102,9 +102,10 @@ impl<'a> Tokens<'a> { let tokens = tx.tokens().expire(&expire_at).await?; tx.commit().await?; - for event in tokens.into_iter().map(TokenEvent::Revoked) { - self.token_events.broadcast(event); - } + tokens + .into_iter() + .map(TokenEvent::Revoked) + .for_each(|event| self.token_events.broadcast(event)); Ok(()) } diff --git a/src/user/create.rs b/src/user/create.rs index 21c61d1..d6656e5 100644 --- a/src/user/create.rs +++ b/src/user/create.rs @@ -99,7 +99,7 @@ where login: _, } = self; - events.broadcast(user_events.into_iter().collect::>()); + events.broadcast_from(user_events); } pub fn login(&self) -> &Login { -- cgit v1.2.3 From f839449d5505b5352bd0da931b980a7a0305234f Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Tue, 26 Aug 2025 18:44:34 -0400 Subject: Remove entirely-redundant synchronization inside of Broadcaster. Per , a `Sender` is safe to share between threads. The clone behaviour we want is also provided by its `Clone` impl directly, and we don't need to wrap the sender in an `Arc` to share it. It's amazing what you can find in the docs. --- src/broadcast.rs | 23 +++++------------------ 1 file changed, 5 insertions(+), 18 deletions(-) (limited to 'src') diff --git a/src/broadcast.rs b/src/broadcast.rs index ee42c08..dae7641 100644 --- a/src/broadcast.rs +++ b/src/broadcast.rs @@ -1,16 +1,11 @@ -use std::sync::{Arc, Mutex}; - use futures::{Stream, future, stream::StreamExt as _}; use tokio::sync::broadcast::{Sender, channel}; use tokio_stream::wrappers::{BroadcastStream, errors::BroadcastStreamRecvError}; -// Clones will share the same sender. +// Clones will share the same channel. #[derive(Clone)] pub struct Broadcaster { - // The use of std::sync::Mutex, and not tokio::sync::Mutex, follows Tokio's - // own advice: . Methods that - // lock it must be sync. - senders: Arc>>, + sender: Sender, } impl Default for Broadcaster @@ -20,9 +15,7 @@ where fn default() -> Self { let sender = Self::make_sender(); - Self { - senders: Arc::new(Mutex::new(sender)), - } + Self { sender } } } @@ -31,8 +24,6 @@ where M: Clone + Send + std::fmt::Debug + 'static, { pub fn broadcast(&self, message: M) { - let tx = self.sender(); - // Per the Tokio docs, the returned error is only used to indicate that // there are no receivers. In this use case, that's fine; a lack of // listening consumers (chat clients) when a message is sent isn't an @@ -40,7 +31,7 @@ where // // The successful return value, which includes the number of active // receivers, also isn't that interesting to us. - let _ = tx.send(message); + let _ = self.sender.send(message); } // If `M` is a type that can be obtained from an iterator, such as a `Vec`, and if `I` is an @@ -62,7 +53,7 @@ where } pub fn subscribe(&self) -> impl Stream + std::fmt::Debug + use { - let rx = self.sender().subscribe(); + let rx = self.sender.subscribe(); BroadcastStream::from(rx).scan((), |(), r| { // The following could technically be `r.ok()`, and is exactly @@ -83,10 +74,6 @@ where }) } - fn sender(&self) -> Sender { - self.senders.lock().unwrap().clone() - } - fn make_sender() -> Sender { // Queue depth of 16 chosen entirely arbitrarily. Don't read too much // into it. -- cgit v1.2.3