summaryrefslogtreecommitdiff
path: root/src/user
diff options
context:
space:
mode:
authorojacobson <ojacobson@noreply.codeberg.org>2025-08-27 06:10:29 +0200
committerojacobson <ojacobson@noreply.codeberg.org>2025-08-27 06:10:29 +0200
commit8712c3a19c279d664ce75e8e90d6dde1bda56cb4 (patch)
tree93c95548126eea048cd8962345b720b883d391c1 /src/user
parent7b131e35fdea1a68aaf9230d157bafb200557ef8 (diff)
parentf839449d5505b5352bd0da931b980a7a0305234f (diff)
Implement storage of synchronized entities in terms of events, not state.
Conversations, users, messages, and all other "synchronized" entities now have an in-memory implementation of their lifecycle, rather than a database-backed one. These operations take a history, apply one lifecycle change to that history, and emit a new history. Storage is then implemented by applying the events in this new history to the database. The storage methods in repo types, which process these events by emitting SQL statements, make necessary assumptions that the events being passed to them are coherent with the data already in storage. For example, the code to handle a conversation's delete event is allowed to assume that the database already contains a row for that conversation, inserted in response to a prior conversation creation event. Data retrieval is not modified in this commit, and probably never will be without a more thorough storage rewrite. The whole intention of the data modelling approach I've been using is that a single row per entity represents its entire history, in turn so that the data in the database should be legible to people approaching it using normal SQL tools. Developed as an aesthetic response to increasing unease with the lack of an ORM versus the boring-ness of our actual queries. Merges event-based-storage into main.
Diffstat (limited to 'src/user')
-rw-r--r--src/user/create.rs39
-rw-r--r--src/user/history.rs22
-rw-r--r--src/user/repo.rs50
3 files changed, 76 insertions, 35 deletions
diff --git a/src/user/create.rs b/src/user/create.rs
index 5c060c9..d6656e5 100644
--- a/src/user/create.rs
+++ b/src/user/create.rs
@@ -3,7 +3,7 @@ use sqlx::{Transaction, sqlite::Sqlite};
use super::{History, repo::Provider as _, validate};
use crate::{
clock::DateTime,
- event::{Broadcaster, Event, repo::Provider as _},
+ event::{Broadcaster, Event, Sequence, repo::Provider as _},
login::{self, Login, repo::Provider as _},
name::Name,
password::{Password, StoredHash},
@@ -54,7 +54,10 @@ pub struct Validated<'a> {
}
impl Validated<'_> {
- pub async fn store(self, tx: &mut Transaction<'_, Sqlite>) -> Result<Stored, sqlx::Error> {
+ pub async fn store(
+ self,
+ tx: &mut Transaction<'_, Sqlite>,
+ ) -> Result<Stored<impl IntoIterator<Item = Event> + use<>>, sqlx::Error> {
let Self {
name,
password,
@@ -63,28 +66,40 @@ impl Validated<'_> {
let login = Login {
id: login::Id::generate(),
- name: name.to_owned(),
+ name: name.clone(),
};
+ tx.logins().create(&login, &password).await?;
let created = tx.sequence().next(created_at).await?;
- tx.logins().create(&login, &password).await?;
- let user = tx.users().create(&login, &created).await?;
+ let user = History::begin(&login, created);
+
+ let events = user.events().filter(Sequence::start_from(created));
+ tx.users().record_events(events.clone()).await?;
- Ok(Stored { user, login })
+ Ok(Stored {
+ events: events.map(Event::from),
+ login,
+ })
}
}
#[must_use = "dropping a user creation attempt is likely a mistake"]
-pub struct Stored {
- user: History,
+pub struct Stored<E> {
+ events: E,
login: Login,
}
-impl Stored {
- pub fn publish(self, broadcaster: &Broadcaster) {
- let Self { user, login: _ } = self;
+impl<E> Stored<E>
+where
+ E: IntoIterator<Item = Event>,
+{
+ pub fn publish(self, events: &Broadcaster) {
+ let Self {
+ events: user_events,
+ login: _,
+ } = self;
- broadcaster.broadcast(user.events().map(Event::from).collect::<Vec<_>>());
+ events.broadcast_from(user_events);
}
pub fn login(&self) -> &Login {
diff --git a/src/user/history.rs b/src/user/history.rs
index f58e9c7..7c06a2d 100644
--- a/src/user/history.rs
+++ b/src/user/history.rs
@@ -2,7 +2,10 @@ use super::{
User,
event::{Created, Event},
};
-use crate::event::{Instant, Sequence};
+use crate::{
+ event::{Instant, Sequence},
+ login::Login,
+};
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct History {
@@ -10,6 +13,21 @@ pub struct History {
pub created: Instant,
}
+// Lifecycle interface
+impl History {
+ pub fn begin(login: &Login, created: Instant) -> Self {
+ let Login { id, name } = login.clone();
+
+ Self {
+ user: User {
+ id: id.into(),
+ name,
+ },
+ created,
+ }
+ }
+}
+
// State interface
impl History {
pub fn as_of<S>(&self, sequence: S) -> Option<User>
@@ -32,7 +50,7 @@ impl History {
.into()
}
- pub fn events(&self) -> impl Iterator<Item = Event> + use<> {
+ pub fn events(&self) -> impl Iterator<Item = Event> + Clone + use<> {
[self.created()].into_iter()
}
}
diff --git a/src/user/repo.rs b/src/user/repo.rs
index aaf3b73..292d72e 100644
--- a/src/user/repo.rs
+++ b/src/user/repo.rs
@@ -1,13 +1,13 @@
use futures::stream::{StreamExt as _, TryStreamExt as _};
use sqlx::{SqliteConnection, Transaction, sqlite::Sqlite};
+use super::{Event, History, Id, User, event::Created};
use crate::{
clock::DateTime,
db::NotFound,
event::{Instant, Sequence},
login::Login,
name::{self, Name},
- user::{History, Id, User},
};
pub trait Provider {
@@ -23,30 +23,39 @@ impl Provider for Transaction<'_, Sqlite> {
pub struct Users<'t>(&'t mut SqliteConnection);
impl Users<'_> {
- pub async fn create(
+ pub async fn record_events(
&mut self,
- login: &Login,
- created: &Instant,
- ) -> Result<History, sqlx::Error> {
+ events: impl IntoIterator<Item = Event>,
+ ) -> Result<(), sqlx::Error> {
+ for event in events {
+ self.record_event(&event).await?;
+ }
+ Ok(())
+ }
+
+ pub async fn record_event(&mut self, event: &Event) -> Result<(), sqlx::Error> {
+ match event {
+ Event::Created(created) => self.record_created(created).await,
+ }
+ }
+
+ async fn record_created(&mut self, created: &Created) -> Result<(), sqlx::Error> {
+ let Created { user, instant } = created;
+
sqlx::query!(
r#"
- insert into user (id, created_sequence, created_at)
+ insert
+ into user (id, created_at, created_sequence)
values ($1, $2, $3)
"#,
- login.id,
- created.sequence,
- created.at,
+ user.id,
+ instant.at,
+ instant.sequence,
)
.execute(&mut *self.0)
.await?;
- Ok(History {
- user: User {
- id: login.id.clone().into(),
- name: login.name.clone(),
- },
- created: *created,
- })
+ Ok(())
}
pub async fn by_login(&mut self, login: &Login) -> Result<History, LoadError> {
@@ -86,12 +95,11 @@ impl Users<'_> {
id as "id: Id",
login.display_name as "display_name: String",
login.canonical_name as "canonical_name: String",
- user.created_sequence as "created_sequence: Sequence",
- user.created_at as "created_at: DateTime"
+ user.created_at as "created_at: DateTime",
+ user.created_sequence as "created_sequence: Sequence"
from user
join login using (id)
where user.created_sequence <= $1
- order by canonical_name
"#,
resume_at,
)
@@ -119,8 +127,8 @@ impl Users<'_> {
id as "id: Id",
login.display_name as "display_name: String",
login.canonical_name as "canonical_name: String",
- user.created_sequence as "created_sequence: Sequence",
- user.created_at as "created_at: DateTime"
+ user.created_at as "created_at: DateTime",
+ user.created_sequence as "created_sequence: Sequence"
from user
join login using (id)
where user.created_sequence > $1