summaryrefslogtreecommitdiff
path: root/src/conversation
diff options
context:
space:
mode:
authorojacobson <ojacobson@noreply.codeberg.org>2025-08-27 06:10:29 +0200
committerojacobson <ojacobson@noreply.codeberg.org>2025-08-27 06:10:29 +0200
commit8712c3a19c279d664ce75e8e90d6dde1bda56cb4 (patch)
tree93c95548126eea048cd8962345b720b883d391c1 /src/conversation
parent7b131e35fdea1a68aaf9230d157bafb200557ef8 (diff)
parentf839449d5505b5352bd0da931b980a7a0305234f (diff)
Implement storage of synchronized entities in terms of events, not state.
Conversations, users, messages, and all other "synchronized" entities now have an in-memory implementation of their lifecycle, rather than a database-backed one. These operations take a history, apply one lifecycle change to that history, and emit a new history. Storage is then implemented by applying the events in this new history to the database. The storage methods in repo types, which process these events by emitting SQL statements, make necessary assumptions that the events being passed to them are coherent with the data already in storage. For example, the code to handle a conversation's delete event is allowed to assume that the database already contains a row for that conversation, inserted in response to a prior conversation creation event. Data retrieval is not modified in this commit, and probably never will be without a more thorough storage rewrite. The whole intention of the data modelling approach I've been using is that a single row per entity represents its entire history, in turn so that the data in the database should be legible to people approaching it using normal SQL tools. Developed as an aesthetic response to increasing unease with the lack of an ORM versus the boring-ness of our actual queries. Merges event-based-storage into main.
Diffstat (limited to 'src/conversation')
-rw-r--r--src/conversation/app.rs87
-rw-r--r--src/conversation/history.rs44
-rw-r--r--src/conversation/repo.rs138
3 files changed, 162 insertions, 107 deletions
diff --git a/src/conversation/app.rs b/src/conversation/app.rs
index 81ccdcf..26886af 100644
--- a/src/conversation/app.rs
+++ b/src/conversation/app.rs
@@ -3,15 +3,15 @@ use itertools::Itertools;
use sqlx::sqlite::SqlitePool;
use super::{
- Conversation, Id,
+ Conversation, History, Id, history,
repo::{LoadError, Provider as _},
validate,
};
use crate::{
clock::DateTime,
db::{Duplicate as _, NotFound as _},
- event::{Broadcaster, Event, Sequence, repo::Provider as _},
- message::{self, repo::Provider as _},
+ event::{Broadcaster, Sequence, repo::Provider as _},
+ message::repo::Provider as _,
name::{self, Name},
};
@@ -36,15 +36,20 @@ impl<'a> Conversations<'a> {
let mut tx = self.db.begin().await?;
let created = tx.sequence().next(created_at).await?;
- let conversation = tx
- .conversations()
- .create(name, &created)
+ let conversation = History::begin(name, created);
+
+ // This filter technically includes every event in the history, but it's easier to follow if
+ // the various event-manipulating app methods are consistent, and it's harmless to have an
+ // always-satisfied filter.
+ let events = conversation.events().filter(Sequence::start_from(created));
+ tx.conversations()
+ .record_events(events.clone())
.await
.duplicate(|| CreateError::DuplicateName(name.clone()))?;
+
tx.commit().await?;
- self.events
- .broadcast(conversation.events().map(Event::from).collect::<Vec<_>>());
+ self.events.broadcast_from(events);
Ok(conversation.as_created())
}
@@ -78,33 +83,28 @@ impl<'a> Conversations<'a> {
.by_id(conversation)
.await
.not_found(|| DeleteError::NotFound(conversation.clone()))?;
- conversation
- .as_snapshot()
- .ok_or_else(|| DeleteError::Deleted(conversation.id().clone()))?;
-
- let mut events = Vec::new();
let messages = tx.messages().live(&conversation).await?;
+ let deleted_at = tx.sequence().next(deleted_at).await?;
+
let has_messages = messages
.iter()
- .map(message::History::as_snapshot)
+ .map(|message| message.as_of(deleted_at))
.any(|message| message.is_some());
if has_messages {
return Err(DeleteError::NotEmpty(conversation.id().clone()));
}
- let deleted = tx.sequence().next(deleted_at).await?;
- let conversation = tx.conversations().delete(&conversation, &deleted).await?;
- events.extend(
- conversation
- .events()
- .filter(Sequence::start_from(deleted.sequence))
- .map(Event::from),
- );
+ let conversation = conversation.delete(deleted_at)?;
+
+ let events = conversation
+ .events()
+ .filter(Sequence::start_from(deleted_at));
+ tx.conversations().record_events(events.clone()).await?;
tx.commit().await?;
- self.events.broadcast(events);
+ self.events.broadcast_from(events);
Ok(())
}
@@ -120,23 +120,20 @@ impl<'a> Conversations<'a> {
let mut events = Vec::with_capacity(expired.len());
for conversation in expired {
let deleted = tx.sequence().next(relative_to).await?;
- let conversation = tx.conversations().delete(&conversation, &deleted).await?;
- events.push(
- conversation
- .events()
- .filter(Sequence::start_from(deleted.sequence)),
- );
+ let conversation = conversation.delete(deleted)?;
+
+ let conversation_events = conversation.events().filter(Sequence::start_from(deleted));
+ tx.conversations()
+ .record_events(conversation_events.clone())
+ .await?;
+
+ events.push(conversation_events);
}
tx.commit().await?;
- self.events.broadcast(
- events
- .into_iter()
- .kmerge_by(Sequence::merge)
- .map(Event::from)
- .collect::<Vec<_>>(),
- );
+ self.events
+ .broadcast_from(events.into_iter().kmerge_by(Sequence::merge));
Ok(())
}
@@ -218,8 +215,18 @@ impl From<LoadError> for DeleteError {
}
}
+impl From<history::DeleteError> for DeleteError {
+ fn from(error: history::DeleteError) -> Self {
+ match error {
+ history::DeleteError::Deleted(conversation) => Self::Deleted(conversation.id().clone()),
+ }
+ }
+}
+
#[derive(Debug, thiserror::Error)]
pub enum ExpireError {
+ #[error("tried to expire already-deleted conversation: {0}")]
+ Deleted(Id),
#[error(transparent)]
Database(#[from] sqlx::Error),
#[error(transparent)]
@@ -234,3 +241,11 @@ impl From<LoadError> for ExpireError {
}
}
}
+
+impl From<history::DeleteError> for ExpireError {
+ fn from(error: history::DeleteError) -> Self {
+ match error {
+ history::DeleteError::Deleted(conversation) => Self::Deleted(conversation.id().clone()),
+ }
+ }
+}
diff --git a/src/conversation/history.rs b/src/conversation/history.rs
index 746a1b0..5cba9ca 100644
--- a/src/conversation/history.rs
+++ b/src/conversation/history.rs
@@ -4,13 +4,49 @@ use super::{
Conversation, Id,
event::{Created, Deleted, Event},
};
-use crate::event::Sequence;
+use crate::{
+ event::{Instant, Sequence},
+ name::Name,
+};
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct History {
pub conversation: Conversation,
}
+// Lifecycle interface
+impl History {
+ pub fn begin(name: &Name, created: Instant) -> Self {
+ Self {
+ conversation: Conversation {
+ id: Id::generate(),
+ name: name.clone(),
+ created,
+ deleted: None,
+ },
+ }
+ }
+
+ pub fn delete(self, deleted: Instant) -> Result<Self, DeleteError> {
+ if self.conversation.deleted.is_none() {
+ Ok(Self {
+ conversation: Conversation {
+ deleted: Some(deleted),
+ ..self.conversation
+ },
+ })
+ } else {
+ Err(DeleteError::Deleted(self))
+ }
+ }
+}
+
+#[derive(Debug, thiserror::Error)]
+pub enum DeleteError {
+ #[error("conversation {} already deleted", .0.conversation.id)]
+ Deleted(History),
+}
+
// State interface
impl History {
pub fn id(&self) -> &Id {
@@ -30,9 +66,7 @@ impl History {
where
S: Into<Sequence>,
{
- self.events()
- .filter(Sequence::up_to(sequence.into()))
- .collect()
+ self.events().filter(Sequence::up_to(sequence)).collect()
}
// Snapshot of this conversation as of all events recorded in this history.
@@ -43,7 +77,7 @@ impl History {
// Event factories
impl History {
- pub fn events(&self) -> impl Iterator<Item = Event> + use<> {
+ pub fn events(&self) -> impl Iterator<Item = Event> + Clone + use<> {
[self.created()]
.into_iter()
.merge_by(self.deleted(), Sequence::merge)
diff --git a/src/conversation/repo.rs b/src/conversation/repo.rs
index 7e38b62..cb66bf8 100644
--- a/src/conversation/repo.rs
+++ b/src/conversation/repo.rs
@@ -1,9 +1,12 @@
use futures::stream::{StreamExt as _, TryStreamExt as _};
use sqlx::{SqliteConnection, Transaction, sqlite::Sqlite};
+use super::{
+ Conversation, Event, History, Id,
+ event::{Created, Deleted},
+};
use crate::{
clock::DateTime,
- conversation::{Conversation, History, Id},
db::NotFound,
event::{Instant, Sequence},
name::{self, Name},
@@ -22,22 +25,41 @@ impl Provider for Transaction<'_, Sqlite> {
pub struct Conversations<'t>(&'t mut SqliteConnection);
impl Conversations<'_> {
- pub async fn create(&mut self, name: &Name, created: &Instant) -> Result<History, sqlx::Error> {
- let id = Id::generate();
- let name = name.clone();
+ pub async fn record_events(
+ &mut self,
+ events: impl IntoIterator<Item = Event>,
+ ) -> Result<(), sqlx::Error> {
+ for event in events {
+ self.record_event(&event).await?;
+ }
+ Ok(())
+ }
+
+ pub async fn record_event(&mut self, event: &Event) -> Result<(), sqlx::Error> {
+ match event {
+ Event::Created(created) => self.record_created(created).await,
+ Event::Deleted(deleted) => self.record_deleted(deleted).await,
+ }
+ }
+
+ async fn record_created(&mut self, created: &Created) -> Result<(), sqlx::Error> {
+ let Conversation {
+ id,
+ created,
+ name,
+ deleted: _,
+ } = &created.conversation;
let display_name = name.display();
let canonical_name = name.canonical();
- let created = *created;
sqlx::query!(
r#"
insert into conversation (id, created_at, created_sequence, last_sequence)
- values ($1, $2, $3, $4)
+ values ($1, $2, $3, $3)
"#,
id,
created.at,
created.sequence,
- created.sequence,
)
.execute(&mut *self.0)
.await?;
@@ -54,16 +76,50 @@ impl Conversations<'_> {
.execute(&mut *self.0)
.await?;
- let conversation = History {
- conversation: Conversation {
- created,
- id,
- name: name.clone(),
- deleted: None,
- },
- };
+ Ok(())
+ }
- Ok(conversation)
+ async fn record_deleted(&mut self, deleted: &Deleted) -> Result<(), sqlx::Error> {
+ let Deleted { instant, id } = deleted;
+ sqlx::query!(
+ r#"
+ update conversation
+ set last_sequence = max(last_sequence, $1)
+ where id = $2
+ "#,
+ instant.sequence,
+ id,
+ )
+ .execute(&mut *self.0)
+ .await?;
+
+ sqlx::query!(
+ r#"
+ insert into conversation_deleted (id, deleted_at, deleted_sequence)
+ values ($1, $2, $3)
+ "#,
+ id,
+ instant.at,
+ instant.sequence,
+ )
+ .execute(&mut *self.0)
+ .await?;
+
+ // Small social responsibility hack here: when a conversation is deleted, its
+ // name is retconned to have been the empty string. Someone reading the event
+ // stream afterwards, or looking at conversations via the API, cannot retrieve
+ // the "deleted" conversation's information by ignoring the deletion event.
+ sqlx::query!(
+ r#"
+ delete from conversation_name
+ where id = $1
+ "#,
+ id,
+ )
+ .execute(&mut *self.0)
+ .await?;
+
+ Ok(())
}
pub async fn by_id(&mut self, conversation: &Id) -> Result<History, LoadError> {
@@ -179,56 +235,6 @@ impl Conversations<'_> {
Ok(conversations)
}
- pub async fn delete(
- &mut self,
- conversation: &History,
- deleted: &Instant,
- ) -> Result<History, LoadError> {
- let id = conversation.id();
- sqlx::query!(
- r#"
- update conversation
- set last_sequence = max(last_sequence, $1)
- where id = $2
- returning id as "id: Id"
- "#,
- deleted.sequence,
- id,
- )
- .fetch_one(&mut *self.0)
- .await?;
-
- sqlx::query!(
- r#"
- insert into conversation_deleted (id, deleted_at, deleted_sequence)
- values ($1, $2, $3)
- "#,
- id,
- deleted.at,
- deleted.sequence,
- )
- .execute(&mut *self.0)
- .await?;
-
- // Small social responsibility hack here: when a conversation is deleted, its
- // name is retconned to have been the empty string. Someone reading the event
- // stream afterwards, or looking at conversations via the API, cannot retrieve
- // the "deleted" conversation's information by ignoring the deletion event.
- sqlx::query!(
- r#"
- delete from conversation_name
- where id = $1
- "#,
- id,
- )
- .execute(&mut *self.0)
- .await?;
-
- let conversation = self.by_id(id).await?;
-
- Ok(conversation)
- }
-
pub async fn purge(&mut self, purge_at: &DateTime) -> Result<(), sqlx::Error> {
let conversations = sqlx::query_scalar!(
r#"