summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorojacobson <ojacobson@noreply.codeberg.org>2025-07-01 02:45:39 +0200
committerojacobson <ojacobson@noreply.codeberg.org>2025-07-01 02:45:39 +0200
commitc0c825477e476d6d7331bfc409bceff9c376b484 (patch)
tree2658c58137c64beec670b1a87b46e6ba4d6eb7e7 /src
parent022ae4ecf5f05b72280b1d702cb92d6795485639 (diff)
parent7778cdf0c495a04f4f5f3f85b78348c8037a5771 (diff)
Send back the current state as events, not snapshots, during client boot.
There are a couple of contributing reasons for this. * Each client's author - even ourselves - is best positioned to know how best to convert history into state to meet the needs of that specific client. There is (probably) no universal solution. You can already see this with the built-in client, where unread tracking gets stapled onto snapshots locally and maintained as events roll in, and I would expect this to happen more and more regularly over time. If we ever sprout other clients, I'd also expect their local state to be different. The API, on the other hand, must expose a surface that's universal to all clients. For boot, that was a very rote list-of-nouns data model. The other option is to expose a surface specific to one client and make other clients accommodate, which is contrary to the goals of this project. * The need to compute snapshots adds friction when adding or changing the behaviour of the API, even when those changes only tangentially touch `/api/boot`. For example, my work on adding messages to multiple conversations got hung up in trying to figure out how to represent that at boot time, plus how to represent that in the event stream. * The rationale for sending back a computed snapshot of the state was to avoid having the client replay events from the beginning of time, and to limit the amount of data sent back. This didn't pan out - most snapshots in practice consisted of the same data you'd get from the event stream anyways, almost with a 1:1 correspondence (with a `sent` or `created` event standing in for a `messages`, `channels`, or `users` entry). Exceptions - deleted messages and channels - were rare, and are ephemeral. * Generating the snapshots requires loading the entire history into memory anyways. We're not saving any server-side IO by computing snapshots, but we are spending server-side compute time to generate them for clients that are then going to throw them away, as above. This change resolves these tensions by delegating state management _entirely_ to the client, removing the server-side state snapshots. The server communicates in events only. ## Alternatives I joked to @wlonk that the "2.0-bis" version of this change always returns `resume_point` 0 and an empty events list. That would be correct, and compatible with the client logic in this change, and would actually work. In fact, we could get rid of the event part of `/api/boot` entirely, and require clients to consume the event stream from the beginning every time they reconnect. The main reason I _don't_ want to do this has to do with reconnects. Right now - both with snapshots, before this change, and with events, after - the client can cleanly delineate "historical" events (to be applied while the state is not presented to the user) and "current" events (to be presented to the user immediately). The `application/event-stream` protocol has no way to make that distinction out of the box, and while we can hack something in, all the approaches I can think of are nasty. Merges boot-events into main.
Diffstat (limited to 'src')
-rw-r--r--src/boot/app.rs44
-rw-r--r--src/boot/handlers/boot/mod.rs20
-rw-r--r--src/boot/handlers/boot/test.rs133
-rw-r--r--src/boot/mod.rs19
-rw-r--r--src/channel/handlers/create/test.rs14
-rw-r--r--src/channel/handlers/delete/test.rs12
-rw-r--r--src/channel/handlers/send/test.rs4
-rw-r--r--src/channel/history.rs6
-rw-r--r--src/event/handlers/stream/test/channel.rs28
-rw-r--r--src/event/handlers/stream/test/invite.rs8
-rw-r--r--src/event/handlers/stream/test/message.rs36
-rw-r--r--src/event/handlers/stream/test/resume.rs20
-rw-r--r--src/event/handlers/stream/test/setup.rs4
-rw-r--r--src/event/handlers/stream/test/token.rs12
-rw-r--r--src/message/handlers/delete/test.rs10
-rw-r--r--src/message/history.rs6
-rw-r--r--src/test/fixtures/event.rs79
-rw-r--r--src/test/fixtures/event/mod.rs74
-rw-r--r--src/test/fixtures/event/stream.rs62
-rw-r--r--src/user/history.rs8
20 files changed, 392 insertions, 207 deletions
diff --git a/src/boot/app.rs b/src/boot/app.rs
index cd45c38..89eec12 100644
--- a/src/boot/app.rs
+++ b/src/boot/app.rs
@@ -1,10 +1,11 @@
+use itertools::Itertools as _;
use sqlx::sqlite::SqlitePool;
use super::Snapshot;
use crate::{
channel::{self, repo::Provider as _},
- event::{Heartbeat, repo::Provider as _},
- message::repo::Provider as _,
+ event::{Event, Sequence, repo::Provider as _},
+ message::{self, repo::Provider as _},
name,
user::{self, repo::Provider as _},
};
@@ -21,7 +22,6 @@ impl<'a> Boot<'a> {
pub async fn snapshot(&self) -> Result<Snapshot, Error> {
let mut tx = self.db.begin().await?;
let resume_point = tx.sequence().current().await?;
- let heartbeat = Heartbeat::TIMEOUT;
let users = tx.users().all(resume_point).await?;
let channels = tx.channels().all(resume_point).await?;
@@ -29,27 +29,35 @@ impl<'a> Boot<'a> {
tx.commit().await?;
- let users = users
- .into_iter()
- .filter_map(|user| user.as_of(resume_point))
- .collect();
+ let user_events = users
+ .iter()
+ .map(user::History::events)
+ .kmerge_by(Sequence::merge)
+ .filter(Sequence::up_to(resume_point))
+ .map(Event::from);
- let channels = channels
- .into_iter()
- .filter_map(|channel| channel.as_of(resume_point))
- .collect();
+ let channel_events = channels
+ .iter()
+ .map(channel::History::events)
+ .kmerge_by(Sequence::merge)
+ .filter(Sequence::up_to(resume_point))
+ .map(Event::from);
+
+ let message_events = messages
+ .iter()
+ .map(message::History::events)
+ .kmerge_by(Sequence::merge)
+ .filter(Sequence::up_to(resume_point))
+ .map(Event::from);
- let messages = messages
- .into_iter()
- .filter_map(|message| message.as_of(resume_point))
+ let events = user_events
+ .merge_by(channel_events, Sequence::merge)
+ .merge_by(message_events, Sequence::merge)
.collect();
Ok(Snapshot {
resume_point,
- heartbeat,
- users,
- channels,
- messages,
+ events,
})
}
}
diff --git a/src/boot/handlers/boot/mod.rs b/src/boot/handlers/boot/mod.rs
index 010f57b..49691f7 100644
--- a/src/boot/handlers/boot/mod.rs
+++ b/src/boot/handlers/boot/mod.rs
@@ -1,17 +1,26 @@
+use std::time::Duration;
+
use axum::{
extract::{Json, State},
response::{self, IntoResponse},
};
+use serde::Serialize;
-use crate::{app::App, boot::Snapshot, error::Internal, token::extract::Identity, user::User};
+use crate::{
+ app::App, boot::Snapshot, error::Internal, event::Heartbeat, token::extract::Identity,
+ user::User,
+};
#[cfg(test)]
mod test;
pub async fn handler(State(app): State<App>, identity: Identity) -> Result<Response, Internal> {
let snapshot = app.boot().snapshot().await?;
+ let heartbeat = Heartbeat::TIMEOUT;
+
Ok(Response {
user: identity.user,
+ heartbeat,
snapshot,
})
}
@@ -19,6 +28,8 @@ pub async fn handler(State(app): State<App>, identity: Identity) -> Result<Respo
#[derive(serde::Serialize)]
pub struct Response {
pub user: User,
+ #[serde(serialize_with = "as_seconds")]
+ pub heartbeat: Duration,
#[serde(flatten)]
pub snapshot: Snapshot,
}
@@ -28,3 +39,10 @@ impl IntoResponse for Response {
Json(self).into_response()
}
}
+
+fn as_seconds<S>(duration: &Duration, serializer: S) -> Result<S::Ok, S::Error>
+where
+ S: serde::Serializer,
+{
+ duration.as_secs().serialize(serializer)
+}
diff --git a/src/boot/handlers/boot/test.rs b/src/boot/handlers/boot/test.rs
index 0a7622b..1e590a7 100644
--- a/src/boot/handlers/boot/test.rs
+++ b/src/boot/handlers/boot/test.rs
@@ -1,4 +1,5 @@
use axum::extract::State;
+use itertools::Itertools as _;
use crate::test::fixtures;
@@ -15,7 +16,7 @@ async fn returns_identity() {
}
#[tokio::test]
-async fn includes_logins() {
+async fn includes_users() {
let app = fixtures::scratch_app().await;
let spectator = fixtures::user::create(&app, &fixtures::now()).await;
@@ -24,7 +25,15 @@ async fn includes_logins() {
.await
.expect("boot always succeeds");
- assert!(response.snapshot.users.contains(&spectator));
+ let created = response
+ .snapshot
+ .events
+ .into_iter()
+ .filter_map(fixtures::event::user)
+ .filter_map(fixtures::event::user::created)
+ .exactly_one()
+ .expect("only one user has been created");
+ assert_eq!(spectator, created.user)
}
#[tokio::test]
@@ -37,7 +46,15 @@ async fn includes_channels() {
.await
.expect("boot always succeeds");
- assert!(response.snapshot.channels.contains(&channel));
+ let created = response
+ .snapshot
+ .events
+ .into_iter()
+ .filter_map(fixtures::event::channel)
+ .filter_map(fixtures::event::channel::created)
+ .exactly_one()
+ .expect("only one channel has been created");
+ assert_eq!(channel, created.channel);
}
#[tokio::test]
@@ -52,11 +69,19 @@ async fn includes_messages() {
.await
.expect("boot always succeeds");
- assert!(response.snapshot.messages.contains(&message));
+ let sent = response
+ .snapshot
+ .events
+ .into_iter()
+ .filter_map(fixtures::event::message)
+ .filter_map(fixtures::event::message::sent)
+ .exactly_one()
+ .expect("only one message has been sent");
+ assert_eq!(message, sent.message);
}
#[tokio::test]
-async fn excludes_expired_messages() {
+async fn includes_expired_messages() {
let app = fixtures::scratch_app().await;
let sender = fixtures::user::create(&app, &fixtures::ancient()).await;
let channel = fixtures::channel::create(&app, &fixtures::ancient()).await;
@@ -73,11 +98,32 @@ async fn excludes_expired_messages() {
.await
.expect("boot always succeeds");
- assert!(!response.snapshot.messages.contains(&expired_message));
+ let sent = response
+ .snapshot
+ .events
+ .iter()
+ .cloned()
+ .filter_map(fixtures::event::message)
+ .filter_map(fixtures::event::message::sent)
+ .exactly_one()
+ .expect("only one message has been sent");
+ // We don't expect `expired_message` to match the event exactly, as the body will have been
+ // tombstoned and the message given a `deleted_at` date.
+ assert_eq!(expired_message.id, sent.message.id);
+
+ let deleted = response
+ .snapshot
+ .events
+ .into_iter()
+ .filter_map(fixtures::event::message)
+ .filter_map(fixtures::event::message::deleted)
+ .exactly_one()
+ .expect("only one message has expired");
+ assert_eq!(expired_message.id, deleted.id);
}
#[tokio::test]
-async fn excludes_deleted_messages() {
+async fn includes_deleted_messages() {
let app = fixtures::scratch_app().await;
let sender = fixtures::user::create(&app, &fixtures::now()).await;
let channel = fixtures::channel::create(&app, &fixtures::now()).await;
@@ -93,11 +139,32 @@ async fn excludes_deleted_messages() {
.await
.expect("boot always succeeds");
- assert!(!response.snapshot.messages.contains(&deleted_message));
+ let sent = response
+ .snapshot
+ .events
+ .iter()
+ .cloned()
+ .filter_map(fixtures::event::message)
+ .filter_map(fixtures::event::message::sent)
+ .exactly_one()
+ .expect("only one message has been sent");
+ // We don't expect `deleted_message` to match the event exactly, as the body will have been
+ // tombstoned and the message given a `deleted_at` date.
+ assert_eq!(deleted_message.id, sent.message.id);
+
+ let deleted = response
+ .snapshot
+ .events
+ .into_iter()
+ .filter_map(fixtures::event::message)
+ .filter_map(fixtures::event::message::deleted)
+ .exactly_one()
+ .expect("only one message has been deleted");
+ assert_eq!(deleted_message.id, deleted.id);
}
#[tokio::test]
-async fn excludes_expired_channels() {
+async fn includes_expired_channels() {
let app = fixtures::scratch_app().await;
let expired_channel = fixtures::channel::create(&app, &fixtures::ancient()).await;
@@ -111,11 +178,32 @@ async fn excludes_expired_channels() {
.await
.expect("boot always succeeds");
- assert!(!response.snapshot.channels.contains(&expired_channel));
+ let created = response
+ .snapshot
+ .events
+ .iter()
+ .cloned()
+ .filter_map(fixtures::event::channel)
+ .filter_map(fixtures::event::channel::created)
+ .exactly_one()
+ .expect("only one channel has been created");
+ // We don't expect `expired_channel` to match the event exactly, as the name will have been
+ // tombstoned and the channel given a `deleted_at` date.
+ assert_eq!(expired_channel.id, created.channel.id);
+
+ let deleted = response
+ .snapshot
+ .events
+ .into_iter()
+ .filter_map(fixtures::event::channel)
+ .filter_map(fixtures::event::channel::deleted)
+ .exactly_one()
+ .expect("only one channel has expired");
+ assert_eq!(expired_channel.id, deleted.id);
}
#[tokio::test]
-async fn excludes_deleted_channels() {
+async fn includes_deleted_channels() {
let app = fixtures::scratch_app().await;
let deleted_channel = fixtures::channel::create(&app, &fixtures::now()).await;
@@ -129,5 +217,26 @@ async fn excludes_deleted_channels() {
.await
.expect("boot always succeeds");
- assert!(!response.snapshot.channels.contains(&deleted_channel));
+ let created = response
+ .snapshot
+ .events
+ .iter()
+ .cloned()
+ .filter_map(fixtures::event::channel)
+ .filter_map(fixtures::event::channel::created)
+ .exactly_one()
+ .expect("only one channel has been created");
+ // We don't expect `deleted_channel` to match the event exactly, as the name will have been
+ // tombstoned and the channel given a `deleted_at` date.
+ assert_eq!(deleted_channel.id, created.channel.id);
+
+ let deleted = response
+ .snapshot
+ .events
+ .into_iter()
+ .filter_map(fixtures::event::channel)
+ .filter_map(fixtures::event::channel::deleted)
+ .exactly_one()
+ .expect("only one channel has been deleted");
+ assert_eq!(deleted_channel.id, deleted.id);
}
diff --git a/src/boot/mod.rs b/src/boot/mod.rs
index 48da4f0..e0d35d9 100644
--- a/src/boot/mod.rs
+++ b/src/boot/mod.rs
@@ -1,8 +1,4 @@
-use std::time::Duration;
-
-use serde::Serialize;
-
-use crate::{channel::Channel, event::Sequence, message::Message, user::User};
+use crate::{event::Event, event::Sequence};
pub mod app;
pub mod handlers;
@@ -10,16 +6,5 @@ pub mod handlers;
#[derive(serde::Serialize)]
pub struct Snapshot {
pub resume_point: Sequence,
- #[serde(serialize_with = "as_seconds")]
- pub heartbeat: Duration,
- pub users: Vec<User>,
- pub channels: Vec<Channel>,
- pub messages: Vec<Message>,
-}
-
-fn as_seconds<S>(duration: &Duration, serializer: S) -> Result<S::Ok, S::Error>
-where
- S: serde::Serializer,
-{
- duration.as_secs().serialize(serializer)
+ pub events: Vec<Event>,
}
diff --git a/src/channel/handlers/create/test.rs b/src/channel/handlers/create/test.rs
index 3c770cf..31bb778 100644
--- a/src/channel/handlers/create/test.rs
+++ b/src/channel/handlers/create/test.rs
@@ -2,6 +2,7 @@ use std::future;
use axum::extract::{Json, State};
use futures::stream::StreamExt as _;
+use itertools::Itertools;
use crate::{
channel::app,
@@ -33,7 +34,14 @@ async fn new_channel() {
// Verify the semantics
let snapshot = app.boot().snapshot().await.expect("boot always succeeds");
- assert!(snapshot.channels.iter().any(|channel| channel == &response));
+ let created = snapshot
+ .events
+ .into_iter()
+ .filter_map(fixtures::event::channel)
+ .filter_map(fixtures::event::channel::created)
+ .exactly_one()
+ .expect("only one channel has been created");
+ assert_eq!(response, created.channel);
let channel = app
.channels()
@@ -47,8 +55,8 @@ async fn new_channel() {
.subscribe(resume_point)
.await
.expect("subscribing never fails")
- .filter_map(fixtures::event::channel)
- .filter_map(fixtures::event::channel::created)
+ .filter_map(fixtures::event::stream::channel)
+ .filter_map(fixtures::event::stream::channel::created)
.filter(|event| future::ready(event.channel == response));
let event = events.next().expect_some("creation event published").await;
diff --git a/src/channel/handlers/delete/test.rs b/src/channel/handlers/delete/test.rs
index b1e42ea..99c19db 100644
--- a/src/channel/handlers/delete/test.rs
+++ b/src/channel/handlers/delete/test.rs
@@ -1,4 +1,5 @@
use axum::extract::{Path, State};
+use itertools::Itertools;
use crate::{channel::app, test::fixtures};
@@ -28,7 +29,16 @@ pub async fn valid_channel() {
// Verify the semantics
let snapshot = app.boot().snapshot().await.expect("boot always succeeds");
- assert!(!snapshot.channels.contains(&channel));
+ let created = snapshot
+ .events
+ .into_iter()
+ .filter_map(fixtures::event::channel)
+ .filter_map(fixtures::event::channel::created)
+ .exactly_one()
+ .expect("only one channel has been created");
+ // We don't expect `channel` to match the event exactly, as the name will have been
+ // tombstoned and the channel given a `deleted_at` date.
+ assert_eq!(channel.id, created.channel.id);
}
#[tokio::test]
diff --git a/src/channel/handlers/send/test.rs b/src/channel/handlers/send/test.rs
index f43f901..7204ca4 100644
--- a/src/channel/handlers/send/test.rs
+++ b/src/channel/handlers/send/test.rs
@@ -45,8 +45,8 @@ async fn messages_in_order() {
.subscribe(resume_point)
.await
.expect("subscribing to a valid channel succeeds")
- .filter_map(fixtures::event::message)
- .filter_map(fixtures::event::message::sent)
+ .filter_map(fixtures::event::stream::message)
+ .filter_map(fixtures::event::stream::message::sent)
.zip(stream::iter(requests));
while let Some((event, (sent_at, body))) = events
diff --git a/src/channel/history.rs b/src/channel/history.rs
index faf6a0e..7f18e45 100644
--- a/src/channel/history.rs
+++ b/src/channel/history.rs
@@ -27,12 +27,6 @@ impl History {
self.channel.clone()
}
- pub fn as_of(&self, resume_point: Sequence) -> Option<Channel> {
- self.events()
- .filter(Sequence::up_to(resume_point))
- .collect()
- }
-
// Snapshot of this channel as of all events recorded in this history.
pub fn as_snapshot(&self) -> Option<Channel> {
self.events().collect()
diff --git a/src/event/handlers/stream/test/channel.rs b/src/event/handlers/stream/test/channel.rs
index 187c3c3..2b87ce2 100644
--- a/src/event/handlers/stream/test/channel.rs
+++ b/src/event/handlers/stream/test/channel.rs
@@ -35,8 +35,8 @@ async fn creating() {
// Verify channel created event
events
- .filter_map(fixtures::event::channel)
- .filter_map(fixtures::event::channel::created)
+ .filter_map(fixtures::event::stream::channel)
+ .filter_map(fixtures::event::stream::channel::created)
.filter(|event| future::ready(event.channel == channel))
.next()
.expect_some("channel created event is delivered")
@@ -74,8 +74,8 @@ async fn previously_created() {
// Verify channel created event
let _ = events
- .filter_map(fixtures::event::channel)
- .filter_map(fixtures::event::channel::created)
+ .filter_map(fixtures::event::stream::channel)
+ .filter_map(fixtures::event::stream::channel::created)
.filter(|event| future::ready(event.channel == channel))
.next()
.expect_some("channel created event is delivered")
@@ -111,8 +111,8 @@ async fn expiring() {
// Check for expiry event
let _ = events
- .filter_map(fixtures::event::channel)
- .filter_map(fixtures::event::channel::deleted)
+ .filter_map(fixtures::event::stream::channel)
+ .filter_map(fixtures::event::stream::channel::deleted)
.filter(|event| future::ready(event.id == channel.id))
.next()
.expect_some("a deleted channel event will be delivered")
@@ -148,8 +148,8 @@ async fn previously_expired() {
// Check for expiry event
let _ = events
- .filter_map(fixtures::event::channel)
- .filter_map(fixtures::event::channel::deleted)
+ .filter_map(fixtures::event::stream::channel)
+ .filter_map(fixtures::event::stream::channel::deleted)
.filter(|event| future::ready(event.id == channel.id))
.next()
.expect_some("a deleted channel event will be delivered")
@@ -185,8 +185,8 @@ async fn deleting() {
// Check for delete event
let _ = events
- .filter_map(fixtures::event::channel)
- .filter_map(fixtures::event::channel::deleted)
+ .filter_map(fixtures::event::stream::channel)
+ .filter_map(fixtures::event::stream::channel::deleted)
.filter(|event| future::ready(event.id == channel.id))
.next()
.expect_some("a deleted channel event will be delivered")
@@ -222,8 +222,8 @@ async fn previously_deleted() {
// Check for expiry event
let _ = events
- .filter_map(fixtures::event::channel)
- .filter_map(fixtures::event::channel::deleted)
+ .filter_map(fixtures::event::stream::channel)
+ .filter_map(fixtures::event::stream::channel::deleted)
.filter(|event| future::ready(event.id == channel.id))
.next()
.expect_some("a deleted channel event will be delivered")
@@ -264,8 +264,8 @@ async fn previously_purged() {
// Check for expiry event
events
- .filter_map(fixtures::event::channel)
- .filter_map(fixtures::event::channel::deleted)
+ .filter_map(fixtures::event::stream::channel)
+ .filter_map(fixtures::event::stream::channel::deleted)
.filter(|event| future::ready(event.id == channel.id))
.next()
.expect_wait("deleted channel events not delivered")
diff --git a/src/event/handlers/stream/test/invite.rs b/src/event/handlers/stream/test/invite.rs
index c8e12fb..01372ce 100644
--- a/src/event/handlers/stream/test/invite.rs
+++ b/src/event/handlers/stream/test/invite.rs
@@ -37,8 +37,8 @@ async fn accepting_invite() {
// Expect a login created event
let _ = events
- .filter_map(fixtures::event::user)
- .filter_map(fixtures::event::user::created)
+ .filter_map(fixtures::event::stream::user)
+ .filter_map(fixtures::event::stream::user::created)
.filter(|event| future::ready(event.user == joiner))
.next()
.expect_some("a login created event is sent")
@@ -78,8 +78,8 @@ async fn previously_accepted_invite() {
// Expect a login created event
let _ = events
- .filter_map(fixtures::event::user)
- .filter_map(fixtures::event::user::created)
+ .filter_map(fixtures::event::stream::user)
+ .filter_map(fixtures::event::stream::user::created)
.filter(|event| future::ready(event.user == joiner))
.next()
.expect_some("a login created event is sent")
diff --git a/src/event/handlers/stream/test/message.rs b/src/event/handlers/stream/test/message.rs
index a80c896..4369996 100644
--- a/src/event/handlers/stream/test/message.rs
+++ b/src/event/handlers/stream/test/message.rs
@@ -44,8 +44,8 @@ async fn sending() {
// Verify that an event is delivered
let _ = events
- .filter_map(fixtures::event::message)
- .filter_map(fixtures::event::message::sent)
+ .filter_map(fixtures::event::stream::message)
+ .filter_map(fixtures::event::stream::message::sent)
.filter(|event| future::ready(event.message == message))
.next()
.expect_some("delivered message sent event")
@@ -89,8 +89,8 @@ async fn previously_sent() {
// Verify that an event is delivered
let _ = events
- .filter_map(fixtures::event::message)
- .filter_map(fixtures::event::message::sent)
+ .filter_map(fixtures::event::stream::message)
+ .filter_map(fixtures::event::stream::message::sent)
.filter(|event| future::ready(event.message == message))
.next()
.expect_some("delivered message sent event")
@@ -135,8 +135,8 @@ async fn sent_in_multiple_channels() {
// Verify the structure of the response.
let events = events
- .filter_map(fixtures::event::message)
- .filter_map(fixtures::event::message::sent)
+ .filter_map(fixtures::event::stream::message)
+ .filter_map(fixtures::event::stream::message::sent)
.take(messages.len())
.collect::<Vec<_>>()
.expect_ready("events ready")
@@ -177,8 +177,8 @@ async fn sent_sequentially() {
// Verify the expected events in the expected order
let mut events = events
- .filter_map(fixtures::event::message)
- .filter_map(fixtures::event::message::sent)
+ .filter_map(fixtures::event::stream::message)
+ .filter_map(fixtures::event::stream::message::sent)
.filter(|event| future::ready(messages.iter().any(|message| &event.message == message)));
for message in &messages {
@@ -222,8 +222,8 @@ async fn expiring() {
// Check for expiry event
let _ = events
- .filter_map(fixtures::event::message)
- .filter_map(fixtures::event::message::deleted)
+ .filter_map(fixtures::event::stream::message)
+ .filter_map(fixtures::event::stream::message::deleted)
.filter(|event| future::ready(event.id == message.id))
.next()
.expect_some("a deleted message event will be delivered")
@@ -261,8 +261,8 @@ async fn previously_expired() {
// Check for expiry event
let _ = events
- .filter_map(fixtures::event::message)
- .filter_map(fixtures::event::message::deleted)
+ .filter_map(fixtures::event::stream::message)
+ .filter_map(fixtures::event::stream::message::deleted)
.filter(|event| future::ready(event.id == message.id))
.next()
.expect_some("a deleted message event will be delivered")
@@ -300,8 +300,8 @@ async fn deleting() {
// Check for delete event
let _ = events
- .filter_map(fixtures::event::message)
- .filter_map(fixtures::event::message::deleted)
+ .filter_map(fixtures::event::stream::message)
+ .filter_map(fixtures::event::stream::message::deleted)
.filter(|event| future::ready(event.id == message.id))
.next()
.expect_some("a deleted message event will be delivered")
@@ -339,8 +339,8 @@ async fn previously_deleted() {
// Check for delete event
let _ = events
- .filter_map(fixtures::event::message)
- .filter_map(fixtures::event::message::deleted)
+ .filter_map(fixtures::event::stream::message)
+ .filter_map(fixtures::event::stream::message::deleted)
.filter(|event| future::ready(event.id == message.id))
.next()
.expect_some("a deleted message event will be delivered")
@@ -384,8 +384,8 @@ async fn previously_purged() {
// Check for delete event
events
- .filter_map(fixtures::event::message)
- .filter_map(fixtures::event::message::deleted)
+ .filter_map(fixtures::event::stream::message)
+ .filter_map(fixtures::event::stream::message::deleted)
.filter(|event| future::ready(event.id == message.id))
.next()
.expect_wait("no deleted message will be delivered")
diff --git a/src/event/handlers/stream/test/resume.rs b/src/event/handlers/stream/test/resume.rs
index 34fee4d..835d350 100644
--- a/src/event/handlers/stream/test/resume.rs
+++ b/src/event/handlers/stream/test/resume.rs
@@ -41,8 +41,8 @@ async fn resume() {
.expect("subscribe never fails");
let event = events
- .filter_map(fixtures::event::message)
- .filter_map(fixtures::event::message::sent)
+ .filter_map(fixtures::event::stream::message)
+ .filter_map(fixtures::event::stream::message::sent)
.filter(|event| future::ready(event.message == initial_message))
.next()
.expect_some("delivered event for initial message")
@@ -64,8 +64,8 @@ async fn resume() {
// Verify final events
let mut events = resumed
- .filter_map(fixtures::event::message)
- .filter_map(fixtures::event::message::sent)
+ .filter_map(fixtures::event::stream::message)
+ .filter_map(fixtures::event::stream::message::sent)
.zip(stream::iter(later_messages));
while let Some((event, message)) = events.next().expect_ready("event ready").await {
@@ -125,8 +125,8 @@ async fn serial_resume() {
// Check for expected events
let events = events
- .filter_map(fixtures::event::message)
- .filter_map(fixtures::event::message::sent)
+ .filter_map(fixtures::event::stream::message)
+ .filter_map(fixtures::event::stream::message::sent)
.zip(stream::iter(initial_messages))
.collect::<Vec<_>>()
.expect_ready("zipping a finite list of events is ready immediately")
@@ -168,8 +168,8 @@ async fn serial_resume() {
// Check for expected events
let events = events
- .filter_map(fixtures::event::message)
- .filter_map(fixtures::event::message::sent)
+ .filter_map(fixtures::event::stream::message)
+ .filter_map(fixtures::event::stream::message::sent)
.zip(stream::iter(resume_messages))
.collect::<Vec<_>>()
.expect_ready("zipping a finite list of events is ready immediately")
@@ -211,8 +211,8 @@ async fn serial_resume() {
// Check for expected events
let events = events
- .filter_map(fixtures::event::message)
- .filter_map(fixtures::event::message::sent)
+ .filter_map(fixtures::event::stream::message)
+ .filter_map(fixtures::event::stream::message::sent)
.zip(stream::iter(final_messages))
.collect::<Vec<_>>()
.expect_ready("zipping a finite list of events is ready immediately")
diff --git a/src/event/handlers/stream/test/setup.rs b/src/event/handlers/stream/test/setup.rs
index 5335055..992b962 100644
--- a/src/event/handlers/stream/test/setup.rs
+++ b/src/event/handlers/stream/test/setup.rs
@@ -38,8 +38,8 @@ async fn previously_completed() {
// Expect a login created event
let _ = events
- .filter_map(fixtures::event::user)
- .filter_map(fixtures::event::user::created)
+ .filter_map(fixtures::event::stream::user)
+ .filter_map(fixtures::event::stream::user::created)
.filter(|event| future::ready(event.user == owner))
.next()
.expect_some("a login created event is sent")
diff --git a/src/event/handlers/stream/test/token.rs b/src/event/handlers/stream/test/token.rs
index 2008323..e32b489 100644
--- a/src/event/handlers/stream/test/token.rs
+++ b/src/event/handlers/stream/test/token.rs
@@ -43,8 +43,8 @@ async fn terminates_on_token_expiry() {
];
events
- .filter_map(fixtures::event::message)
- .filter_map(fixtures::event::message::sent)
+ .filter_map(fixtures::event::stream::message)
+ .filter_map(fixtures::event::stream::message::sent)
.filter(|event| future::ready(messages.iter().any(|message| &event.message == message)))
.next()
.expect_none("end of stream")
@@ -89,8 +89,8 @@ async fn terminates_on_logout() {
];
events
- .filter_map(fixtures::event::message)
- .filter_map(fixtures::event::message::sent)
+ .filter_map(fixtures::event::stream::message)
+ .filter_map(fixtures::event::stream::message::sent)
.filter(|event| future::ready(messages.iter().any(|message| &event.message == message)))
.next()
.expect_none("end of stream")
@@ -139,8 +139,8 @@ async fn terminates_on_password_change() {
];
events
- .filter_map(fixtures::event::message)
- .filter_map(fixtures::event::message::sent)
+ .filter_map(fixtures::event::stream::message)
+ .filter_map(fixtures::event::stream::message::sent)
.filter(|event| future::ready(messages.iter().any(|message| &event.message == message)))
.next()
.expect_none("end of stream")
diff --git a/src/message/handlers/delete/test.rs b/src/message/handlers/delete/test.rs
index 15aa2c2..f567eb7 100644
--- a/src/message/handlers/delete/test.rs
+++ b/src/message/handlers/delete/test.rs
@@ -1,4 +1,5 @@
use axum::extract::{Path, State};
+use itertools::Itertools;
use crate::{message::app, test::fixtures};
@@ -29,7 +30,14 @@ pub async fn delete_message() {
// Verify the semantics
let snapshot = app.boot().snapshot().await.expect("boot always succeeds");
- assert!(!snapshot.messages.contains(&message));
+ let deleted = snapshot
+ .events
+ .into_iter()
+ .filter_map(fixtures::event::message)
+ .filter_map(fixtures::event::message::deleted)
+ .exactly_one()
+ .expect("only one message has been deleted");
+ assert_eq!(response.id, deleted.id)
}
#[tokio::test]
diff --git a/src/message/history.rs b/src/message/history.rs
index 1a72c08..7585e1c 100644
--- a/src/message/history.rs
+++ b/src/message/history.rs
@@ -27,12 +27,6 @@ impl History {
self.message.clone()
}
- pub fn as_of(&self, resume_point: Sequence) -> Option<Message> {
- self.events()
- .filter(Sequence::up_to(resume_point))
- .collect()
- }
-
// Snapshot of this message as of all events recorded in this history.
pub fn as_snapshot(&self) -> Option<Message> {
self.events().collect()
diff --git a/src/test/fixtures/event.rs b/src/test/fixtures/event.rs
deleted file mode 100644
index a30bb4b..0000000
--- a/src/test/fixtures/event.rs
+++ /dev/null
@@ -1,79 +0,0 @@
-use std::future::{self, Ready};
-
-use crate::event::Event;
-
-pub fn channel(event: Event) -> Ready<Option<channel::Event>> {
- future::ready(match event {
- Event::Channel(channel) => Some(channel),
- _ => None,
- })
-}
-
-pub fn message(event: Event) -> Ready<Option<message::Event>> {
- future::ready(match event {
- Event::Message(event) => Some(event),
- _ => None,
- })
-}
-
-pub fn user(event: Event) -> Ready<Option<user::Event>> {
- future::ready(match event {
- Event::User(event) => Some(event),
- _ => None,
- })
-}
-
-pub mod channel {
- use std::future::{self, Ready};
-
- pub use crate::channel::Event;
- use crate::channel::event;
-
- pub fn created(event: Event) -> Ready<Option<event::Created>> {
- future::ready(match event {
- Event::Created(event) => Some(event),
- Event::Deleted(_) => None,
- })
- }
-
- pub fn deleted(event: Event) -> Ready<Option<event::Deleted>> {
- future::ready(match event {
- Event::Deleted(event) => Some(event),
- Event::Created(_) => None,
- })
- }
-}
-
-pub mod message {
- use std::future::{self, Ready};
-
- pub use crate::message::Event;
- use crate::message::event;
-
- pub fn sent(event: Event) -> Ready<Option<event::Sent>> {
- future::ready(match event {
- Event::Sent(event) => Some(event),
- Event::Deleted(_) => None,
- })
- }
-
- pub fn deleted(event: Event) -> future::Ready<Option<event::Deleted>> {
- future::ready(match event {
- Event::Deleted(event) => Some(event),
- Event::Sent(_) => None,
- })
- }
-}
-
-pub mod user {
- use std::future::{self, Ready};
-
- pub use crate::user::Event;
- use crate::user::event;
-
- pub fn created(event: Event) -> Ready<Option<event::Created>> {
- future::ready(match event {
- Event::Created(event) => Some(event),
- })
- }
-}
diff --git a/src/test/fixtures/event/mod.rs b/src/test/fixtures/event/mod.rs
new file mode 100644
index 0000000..691cdeb
--- /dev/null
+++ b/src/test/fixtures/event/mod.rs
@@ -0,0 +1,74 @@
+use crate::event::Event;
+
+pub mod stream;
+
+pub fn channel(event: Event) -> Option<crate::channel::Event> {
+ match event {
+ Event::Channel(channel) => Some(channel),
+ _ => None,
+ }
+}
+
+pub fn message(event: Event) -> Option<crate::message::Event> {
+ match event {
+ Event::Message(event) => Some(event),
+ _ => None,
+ }
+}
+
+pub fn user(event: Event) -> Option<crate::user::Event> {
+ match event {
+ Event::User(event) => Some(event),
+ _ => None,
+ }
+}
+
+pub mod channel {
+ use crate::channel::{Event, event};
+
+ pub fn created(event: Event) -> Option<event::Created> {
+ match event {
+ Event::Created(event) => Some(event),
+ Event::Deleted(_) => None,
+ }
+ }
+
+ pub fn deleted(event: Event) -> Option<event::Deleted> {
+ match event {
+ Event::Deleted(event) => Some(event),
+ Event::Created(_) => None,
+ }
+ }
+}
+
+pub mod message {
+ use crate::message::{Event, event};
+
+ pub fn sent(event: Event) -> Option<event::Sent> {
+ match event {
+ Event::Sent(event) => Some(event),
+ Event::Deleted(_) => None,
+ }
+ }
+
+ pub fn deleted(event: Event) -> Option<event::Deleted> {
+ match event {
+ Event::Deleted(event) => Some(event),
+ Event::Sent(_) => None,
+ }
+ }
+}
+
+pub mod user {
+ use crate::user::{Event, event};
+
+ // This could be defined as `-> event::Created`. However, I want the interface to be consistent
+ // with the event stream transformers for other types, and we'd have to refactor the return type
+ // to `-> Option<event::Created>` the instant users sprout a second event, anyways.
+ #[allow(clippy::unnecessary_wraps)]
+ pub fn created(event: Event) -> Option<event::Created> {
+ match event {
+ Event::Created(event) => Some(event),
+ }
+ }
+}
diff --git a/src/test/fixtures/event/stream.rs b/src/test/fixtures/event/stream.rs
new file mode 100644
index 0000000..6c2a1bf
--- /dev/null
+++ b/src/test/fixtures/event/stream.rs
@@ -0,0 +1,62 @@
+use std::future::{self, Ready};
+
+use crate::{event::Event, test::fixtures::event};
+
+pub fn channel(event: Event) -> Ready<Option<crate::channel::Event>> {
+ future::ready(event::channel(event))
+}
+
+pub fn message(event: Event) -> Ready<Option<crate::message::Event>> {
+ future::ready(event::message(event))
+}
+
+pub fn user(event: Event) -> Ready<Option<crate::user::Event>> {
+ future::ready(event::user(event))
+}
+
+pub mod channel {
+ use std::future::{self, Ready};
+
+ use crate::{
+ channel::{Event, event},
+ test::fixtures::event::channel,
+ };
+
+ pub fn created(event: Event) -> Ready<Option<event::Created>> {
+ future::ready(channel::created(event))
+ }
+
+ pub fn deleted(event: Event) -> Ready<Option<event::Deleted>> {
+ future::ready(channel::deleted(event))
+ }
+}
+
+pub mod message {
+ use std::future::{self, Ready};
+
+ use crate::{
+ message::{Event, event},
+ test::fixtures::event::message,
+ };
+
+ pub fn sent(event: Event) -> Ready<Option<event::Sent>> {
+ future::ready(message::sent(event))
+ }
+
+ pub fn deleted(event: Event) -> future::Ready<Option<event::Deleted>> {
+ future::ready(message::deleted(event))
+ }
+}
+
+pub mod user {
+ use std::future::{self, Ready};
+
+ use crate::{
+ test::fixtures::event::user,
+ user::{Event, event},
+ };
+
+ pub fn created(event: Event) -> Ready<Option<event::Created>> {
+ future::ready(user::created(event))
+ }
+}
diff --git a/src/user/history.rs b/src/user/history.rs
index ae7a561..72e0aee 100644
--- a/src/user/history.rs
+++ b/src/user/history.rs
@@ -2,7 +2,7 @@ use super::{
Id, User,
event::{Created, Event},
};
-use crate::event::{Instant, Sequence};
+use crate::event::Instant;
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct History {
@@ -24,12 +24,6 @@ impl History {
self.user.clone()
}
- pub fn as_of(&self, resume_point: Sequence) -> Option<User> {
- self.events()
- .filter(Sequence::up_to(resume_point))
- .collect()
- }
-
// Snapshot of this user, as of all events recorded in this history.
pub fn as_snapshot(&self) -> Option<User> {
self.events().collect()