From aae24382399f755cfd80e352be7f5aa584aa5470 Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Thu, 19 Jun 2025 11:47:31 -0400 Subject: Hoist heartbeat configuration out to the web handler. The _snapshot_ is specifically a snapshot of app state. The purpose of the response struct is to annotate the snapshot with information that isn't from the app, but rather from the request or the web layer. The heartbeat timeout isn't ever used by the app layer in any way; it's used by the Axum handler for `/api/events`, instead. I straight-up missed this when I wrote the original heartbeat changes. --- src/boot/app.rs | 4 +--- src/boot/handlers/boot/mod.rs | 20 +++++++++++++++++++- src/boot/mod.rs | 11 ----------- 3 files changed, 20 insertions(+), 15 deletions(-) diff --git a/src/boot/app.rs b/src/boot/app.rs index cd45c38..f531afe 100644 --- a/src/boot/app.rs +++ b/src/boot/app.rs @@ -3,7 +3,7 @@ use sqlx::sqlite::SqlitePool; use super::Snapshot; use crate::{ channel::{self, repo::Provider as _}, - event::{Heartbeat, repo::Provider as _}, + event::repo::Provider as _, message::repo::Provider as _, name, user::{self, repo::Provider as _}, @@ -21,7 +21,6 @@ impl<'a> Boot<'a> { pub async fn snapshot(&self) -> Result { let mut tx = self.db.begin().await?; let resume_point = tx.sequence().current().await?; - let heartbeat = Heartbeat::TIMEOUT; let users = tx.users().all(resume_point).await?; let channels = tx.channels().all(resume_point).await?; @@ -46,7 +45,6 @@ impl<'a> Boot<'a> { Ok(Snapshot { resume_point, - heartbeat, users, channels, messages, diff --git a/src/boot/handlers/boot/mod.rs b/src/boot/handlers/boot/mod.rs index 010f57b..49691f7 100644 --- a/src/boot/handlers/boot/mod.rs +++ b/src/boot/handlers/boot/mod.rs @@ -1,17 +1,26 @@ +use std::time::Duration; + use axum::{ extract::{Json, State}, response::{self, IntoResponse}, }; +use serde::Serialize; -use crate::{app::App, boot::Snapshot, error::Internal, token::extract::Identity, user::User}; +use crate::{ + app::App, boot::Snapshot, error::Internal, event::Heartbeat, token::extract::Identity, + user::User, +}; #[cfg(test)] mod test; pub async fn handler(State(app): State, identity: Identity) -> Result { let snapshot = app.boot().snapshot().await?; + let heartbeat = Heartbeat::TIMEOUT; + Ok(Response { user: identity.user, + heartbeat, snapshot, }) } @@ -19,6 +28,8 @@ pub async fn handler(State(app): State, identity: Identity) -> Result(duration: &Duration, serializer: S) -> Result +where + S: serde::Serializer, +{ + duration.as_secs().serialize(serializer) +} diff --git a/src/boot/mod.rs b/src/boot/mod.rs index 48da4f0..3fc2c9e 100644 --- a/src/boot/mod.rs +++ b/src/boot/mod.rs @@ -1,5 +1,3 @@ -use std::time::Duration; - use serde::Serialize; use crate::{channel::Channel, event::Sequence, message::Message, user::User}; @@ -10,16 +8,7 @@ pub mod handlers; #[derive(serde::Serialize)] pub struct Snapshot { pub resume_point: Sequence, - #[serde(serialize_with = "as_seconds")] - pub heartbeat: Duration, pub users: Vec, pub channels: Vec, pub messages: Vec, } - -fn as_seconds(duration: &Duration, serializer: S) -> Result -where - S: serde::Serializer, -{ - duration.as_secs().serialize(serializer) -} -- cgit v1.2.3 From 057bbef5f37a4051615ad23661a0b4853b61162e Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Thu, 19 Jun 2025 00:49:49 -0400 Subject: Support querying event sequences via iterators or streams. These filters are meant to be used with, respectively, `Iterator::filter_map` and `StreamExt::filter_map`. The two operations are conceptually the same - they pass an item from the underlying sequence to a function that returns an option, drops the values for which the function returns `None`, and yields the value inside of `Some` in the resulting sequence. However, `Iterator::filter_map` takes a function from the iterator elements to `Option`. `StreamExt::filter_map` takes a function from the iterator elements to _a `Future` whose output is `Option`_. As such, you can't easily use functions designed for one use case, for the other. You need an adapter - conventionally, `futures::ready`, if you have a non-async function and need an async one. This provides two sets of sequence filters: * `crate::test::fixtures::event` contains functions which return `Option` directly, and which are intended for use with `Iterator::filter_map`. * `crate::test::fixtures::event::stream` contains lifted versions that return a `Future`, and which are intended for use with `StreamExt::filter_map`. The lifting is done purely manually. I spent a lot of time writing clever-er versions before deciding on this; those were fun to write, but hell to read and not meaningfully better, and this is test support code, so we want it to be dumb and obvious. Complexity for the sake of intellectual satisfaction is a huge antifeature in this context. --- src/channel/handlers/create/test.rs | 4 +- src/channel/handlers/send/test.rs | 4 +- src/event/handlers/stream/test/channel.rs | 28 +++++------ src/event/handlers/stream/test/invite.rs | 8 ++-- src/event/handlers/stream/test/message.rs | 36 +++++++------- src/event/handlers/stream/test/resume.rs | 20 ++++---- src/event/handlers/stream/test/setup.rs | 4 +- src/event/handlers/stream/test/token.rs | 12 ++--- src/test/fixtures/event.rs | 79 ------------------------------- src/test/fixtures/event/mod.rs | 74 +++++++++++++++++++++++++++++ src/test/fixtures/event/stream.rs | 62 ++++++++++++++++++++++++ 11 files changed, 194 insertions(+), 137 deletions(-) delete mode 100644 src/test/fixtures/event.rs create mode 100644 src/test/fixtures/event/mod.rs create mode 100644 src/test/fixtures/event/stream.rs diff --git a/src/channel/handlers/create/test.rs b/src/channel/handlers/create/test.rs index 3c770cf..595a879 100644 --- a/src/channel/handlers/create/test.rs +++ b/src/channel/handlers/create/test.rs @@ -47,8 +47,8 @@ async fn new_channel() { .subscribe(resume_point) .await .expect("subscribing never fails") - .filter_map(fixtures::event::channel) - .filter_map(fixtures::event::channel::created) + .filter_map(fixtures::event::stream::channel) + .filter_map(fixtures::event::stream::channel::created) .filter(|event| future::ready(event.channel == response)); let event = events.next().expect_some("creation event published").await; diff --git a/src/channel/handlers/send/test.rs b/src/channel/handlers/send/test.rs index f43f901..7204ca4 100644 --- a/src/channel/handlers/send/test.rs +++ b/src/channel/handlers/send/test.rs @@ -45,8 +45,8 @@ async fn messages_in_order() { .subscribe(resume_point) .await .expect("subscribing to a valid channel succeeds") - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::sent) + .filter_map(fixtures::event::stream::message) + .filter_map(fixtures::event::stream::message::sent) .zip(stream::iter(requests)); while let Some((event, (sent_at, body))) = events diff --git a/src/event/handlers/stream/test/channel.rs b/src/event/handlers/stream/test/channel.rs index 187c3c3..2b87ce2 100644 --- a/src/event/handlers/stream/test/channel.rs +++ b/src/event/handlers/stream/test/channel.rs @@ -35,8 +35,8 @@ async fn creating() { // Verify channel created event events - .filter_map(fixtures::event::channel) - .filter_map(fixtures::event::channel::created) + .filter_map(fixtures::event::stream::channel) + .filter_map(fixtures::event::stream::channel::created) .filter(|event| future::ready(event.channel == channel)) .next() .expect_some("channel created event is delivered") @@ -74,8 +74,8 @@ async fn previously_created() { // Verify channel created event let _ = events - .filter_map(fixtures::event::channel) - .filter_map(fixtures::event::channel::created) + .filter_map(fixtures::event::stream::channel) + .filter_map(fixtures::event::stream::channel::created) .filter(|event| future::ready(event.channel == channel)) .next() .expect_some("channel created event is delivered") @@ -111,8 +111,8 @@ async fn expiring() { // Check for expiry event let _ = events - .filter_map(fixtures::event::channel) - .filter_map(fixtures::event::channel::deleted) + .filter_map(fixtures::event::stream::channel) + .filter_map(fixtures::event::stream::channel::deleted) .filter(|event| future::ready(event.id == channel.id)) .next() .expect_some("a deleted channel event will be delivered") @@ -148,8 +148,8 @@ async fn previously_expired() { // Check for expiry event let _ = events - .filter_map(fixtures::event::channel) - .filter_map(fixtures::event::channel::deleted) + .filter_map(fixtures::event::stream::channel) + .filter_map(fixtures::event::stream::channel::deleted) .filter(|event| future::ready(event.id == channel.id)) .next() .expect_some("a deleted channel event will be delivered") @@ -185,8 +185,8 @@ async fn deleting() { // Check for delete event let _ = events - .filter_map(fixtures::event::channel) - .filter_map(fixtures::event::channel::deleted) + .filter_map(fixtures::event::stream::channel) + .filter_map(fixtures::event::stream::channel::deleted) .filter(|event| future::ready(event.id == channel.id)) .next() .expect_some("a deleted channel event will be delivered") @@ -222,8 +222,8 @@ async fn previously_deleted() { // Check for expiry event let _ = events - .filter_map(fixtures::event::channel) - .filter_map(fixtures::event::channel::deleted) + .filter_map(fixtures::event::stream::channel) + .filter_map(fixtures::event::stream::channel::deleted) .filter(|event| future::ready(event.id == channel.id)) .next() .expect_some("a deleted channel event will be delivered") @@ -264,8 +264,8 @@ async fn previously_purged() { // Check for expiry event events - .filter_map(fixtures::event::channel) - .filter_map(fixtures::event::channel::deleted) + .filter_map(fixtures::event::stream::channel) + .filter_map(fixtures::event::stream::channel::deleted) .filter(|event| future::ready(event.id == channel.id)) .next() .expect_wait("deleted channel events not delivered") diff --git a/src/event/handlers/stream/test/invite.rs b/src/event/handlers/stream/test/invite.rs index c8e12fb..01372ce 100644 --- a/src/event/handlers/stream/test/invite.rs +++ b/src/event/handlers/stream/test/invite.rs @@ -37,8 +37,8 @@ async fn accepting_invite() { // Expect a login created event let _ = events - .filter_map(fixtures::event::user) - .filter_map(fixtures::event::user::created) + .filter_map(fixtures::event::stream::user) + .filter_map(fixtures::event::stream::user::created) .filter(|event| future::ready(event.user == joiner)) .next() .expect_some("a login created event is sent") @@ -78,8 +78,8 @@ async fn previously_accepted_invite() { // Expect a login created event let _ = events - .filter_map(fixtures::event::user) - .filter_map(fixtures::event::user::created) + .filter_map(fixtures::event::stream::user) + .filter_map(fixtures::event::stream::user::created) .filter(|event| future::ready(event.user == joiner)) .next() .expect_some("a login created event is sent") diff --git a/src/event/handlers/stream/test/message.rs b/src/event/handlers/stream/test/message.rs index a80c896..4369996 100644 --- a/src/event/handlers/stream/test/message.rs +++ b/src/event/handlers/stream/test/message.rs @@ -44,8 +44,8 @@ async fn sending() { // Verify that an event is delivered let _ = events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::sent) + .filter_map(fixtures::event::stream::message) + .filter_map(fixtures::event::stream::message::sent) .filter(|event| future::ready(event.message == message)) .next() .expect_some("delivered message sent event") @@ -89,8 +89,8 @@ async fn previously_sent() { // Verify that an event is delivered let _ = events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::sent) + .filter_map(fixtures::event::stream::message) + .filter_map(fixtures::event::stream::message::sent) .filter(|event| future::ready(event.message == message)) .next() .expect_some("delivered message sent event") @@ -135,8 +135,8 @@ async fn sent_in_multiple_channels() { // Verify the structure of the response. let events = events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::sent) + .filter_map(fixtures::event::stream::message) + .filter_map(fixtures::event::stream::message::sent) .take(messages.len()) .collect::>() .expect_ready("events ready") @@ -177,8 +177,8 @@ async fn sent_sequentially() { // Verify the expected events in the expected order let mut events = events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::sent) + .filter_map(fixtures::event::stream::message) + .filter_map(fixtures::event::stream::message::sent) .filter(|event| future::ready(messages.iter().any(|message| &event.message == message))); for message in &messages { @@ -222,8 +222,8 @@ async fn expiring() { // Check for expiry event let _ = events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::deleted) + .filter_map(fixtures::event::stream::message) + .filter_map(fixtures::event::stream::message::deleted) .filter(|event| future::ready(event.id == message.id)) .next() .expect_some("a deleted message event will be delivered") @@ -261,8 +261,8 @@ async fn previously_expired() { // Check for expiry event let _ = events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::deleted) + .filter_map(fixtures::event::stream::message) + .filter_map(fixtures::event::stream::message::deleted) .filter(|event| future::ready(event.id == message.id)) .next() .expect_some("a deleted message event will be delivered") @@ -300,8 +300,8 @@ async fn deleting() { // Check for delete event let _ = events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::deleted) + .filter_map(fixtures::event::stream::message) + .filter_map(fixtures::event::stream::message::deleted) .filter(|event| future::ready(event.id == message.id)) .next() .expect_some("a deleted message event will be delivered") @@ -339,8 +339,8 @@ async fn previously_deleted() { // Check for delete event let _ = events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::deleted) + .filter_map(fixtures::event::stream::message) + .filter_map(fixtures::event::stream::message::deleted) .filter(|event| future::ready(event.id == message.id)) .next() .expect_some("a deleted message event will be delivered") @@ -384,8 +384,8 @@ async fn previously_purged() { // Check for delete event events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::deleted) + .filter_map(fixtures::event::stream::message) + .filter_map(fixtures::event::stream::message::deleted) .filter(|event| future::ready(event.id == message.id)) .next() .expect_wait("no deleted message will be delivered") diff --git a/src/event/handlers/stream/test/resume.rs b/src/event/handlers/stream/test/resume.rs index 34fee4d..835d350 100644 --- a/src/event/handlers/stream/test/resume.rs +++ b/src/event/handlers/stream/test/resume.rs @@ -41,8 +41,8 @@ async fn resume() { .expect("subscribe never fails"); let event = events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::sent) + .filter_map(fixtures::event::stream::message) + .filter_map(fixtures::event::stream::message::sent) .filter(|event| future::ready(event.message == initial_message)) .next() .expect_some("delivered event for initial message") @@ -64,8 +64,8 @@ async fn resume() { // Verify final events let mut events = resumed - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::sent) + .filter_map(fixtures::event::stream::message) + .filter_map(fixtures::event::stream::message::sent) .zip(stream::iter(later_messages)); while let Some((event, message)) = events.next().expect_ready("event ready").await { @@ -125,8 +125,8 @@ async fn serial_resume() { // Check for expected events let events = events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::sent) + .filter_map(fixtures::event::stream::message) + .filter_map(fixtures::event::stream::message::sent) .zip(stream::iter(initial_messages)) .collect::>() .expect_ready("zipping a finite list of events is ready immediately") @@ -168,8 +168,8 @@ async fn serial_resume() { // Check for expected events let events = events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::sent) + .filter_map(fixtures::event::stream::message) + .filter_map(fixtures::event::stream::message::sent) .zip(stream::iter(resume_messages)) .collect::>() .expect_ready("zipping a finite list of events is ready immediately") @@ -211,8 +211,8 @@ async fn serial_resume() { // Check for expected events let events = events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::sent) + .filter_map(fixtures::event::stream::message) + .filter_map(fixtures::event::stream::message::sent) .zip(stream::iter(final_messages)) .collect::>() .expect_ready("zipping a finite list of events is ready immediately") diff --git a/src/event/handlers/stream/test/setup.rs b/src/event/handlers/stream/test/setup.rs index 5335055..992b962 100644 --- a/src/event/handlers/stream/test/setup.rs +++ b/src/event/handlers/stream/test/setup.rs @@ -38,8 +38,8 @@ async fn previously_completed() { // Expect a login created event let _ = events - .filter_map(fixtures::event::user) - .filter_map(fixtures::event::user::created) + .filter_map(fixtures::event::stream::user) + .filter_map(fixtures::event::stream::user::created) .filter(|event| future::ready(event.user == owner)) .next() .expect_some("a login created event is sent") diff --git a/src/event/handlers/stream/test/token.rs b/src/event/handlers/stream/test/token.rs index 2008323..e32b489 100644 --- a/src/event/handlers/stream/test/token.rs +++ b/src/event/handlers/stream/test/token.rs @@ -43,8 +43,8 @@ async fn terminates_on_token_expiry() { ]; events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::sent) + .filter_map(fixtures::event::stream::message) + .filter_map(fixtures::event::stream::message::sent) .filter(|event| future::ready(messages.iter().any(|message| &event.message == message))) .next() .expect_none("end of stream") @@ -89,8 +89,8 @@ async fn terminates_on_logout() { ]; events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::sent) + .filter_map(fixtures::event::stream::message) + .filter_map(fixtures::event::stream::message::sent) .filter(|event| future::ready(messages.iter().any(|message| &event.message == message))) .next() .expect_none("end of stream") @@ -139,8 +139,8 @@ async fn terminates_on_password_change() { ]; events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::sent) + .filter_map(fixtures::event::stream::message) + .filter_map(fixtures::event::stream::message::sent) .filter(|event| future::ready(messages.iter().any(|message| &event.message == message))) .next() .expect_none("end of stream") diff --git a/src/test/fixtures/event.rs b/src/test/fixtures/event.rs deleted file mode 100644 index a30bb4b..0000000 --- a/src/test/fixtures/event.rs +++ /dev/null @@ -1,79 +0,0 @@ -use std::future::{self, Ready}; - -use crate::event::Event; - -pub fn channel(event: Event) -> Ready> { - future::ready(match event { - Event::Channel(channel) => Some(channel), - _ => None, - }) -} - -pub fn message(event: Event) -> Ready> { - future::ready(match event { - Event::Message(event) => Some(event), - _ => None, - }) -} - -pub fn user(event: Event) -> Ready> { - future::ready(match event { - Event::User(event) => Some(event), - _ => None, - }) -} - -pub mod channel { - use std::future::{self, Ready}; - - pub use crate::channel::Event; - use crate::channel::event; - - pub fn created(event: Event) -> Ready> { - future::ready(match event { - Event::Created(event) => Some(event), - Event::Deleted(_) => None, - }) - } - - pub fn deleted(event: Event) -> Ready> { - future::ready(match event { - Event::Deleted(event) => Some(event), - Event::Created(_) => None, - }) - } -} - -pub mod message { - use std::future::{self, Ready}; - - pub use crate::message::Event; - use crate::message::event; - - pub fn sent(event: Event) -> Ready> { - future::ready(match event { - Event::Sent(event) => Some(event), - Event::Deleted(_) => None, - }) - } - - pub fn deleted(event: Event) -> future::Ready> { - future::ready(match event { - Event::Deleted(event) => Some(event), - Event::Sent(_) => None, - }) - } -} - -pub mod user { - use std::future::{self, Ready}; - - pub use crate::user::Event; - use crate::user::event; - - pub fn created(event: Event) -> Ready> { - future::ready(match event { - Event::Created(event) => Some(event), - }) - } -} diff --git a/src/test/fixtures/event/mod.rs b/src/test/fixtures/event/mod.rs new file mode 100644 index 0000000..691cdeb --- /dev/null +++ b/src/test/fixtures/event/mod.rs @@ -0,0 +1,74 @@ +use crate::event::Event; + +pub mod stream; + +pub fn channel(event: Event) -> Option { + match event { + Event::Channel(channel) => Some(channel), + _ => None, + } +} + +pub fn message(event: Event) -> Option { + match event { + Event::Message(event) => Some(event), + _ => None, + } +} + +pub fn user(event: Event) -> Option { + match event { + Event::User(event) => Some(event), + _ => None, + } +} + +pub mod channel { + use crate::channel::{Event, event}; + + pub fn created(event: Event) -> Option { + match event { + Event::Created(event) => Some(event), + Event::Deleted(_) => None, + } + } + + pub fn deleted(event: Event) -> Option { + match event { + Event::Deleted(event) => Some(event), + Event::Created(_) => None, + } + } +} + +pub mod message { + use crate::message::{Event, event}; + + pub fn sent(event: Event) -> Option { + match event { + Event::Sent(event) => Some(event), + Event::Deleted(_) => None, + } + } + + pub fn deleted(event: Event) -> Option { + match event { + Event::Deleted(event) => Some(event), + Event::Sent(_) => None, + } + } +} + +pub mod user { + use crate::user::{Event, event}; + + // This could be defined as `-> event::Created`. However, I want the interface to be consistent + // with the event stream transformers for other types, and we'd have to refactor the return type + // to `-> Option` the instant users sprout a second event, anyways. + #[allow(clippy::unnecessary_wraps)] + pub fn created(event: Event) -> Option { + match event { + Event::Created(event) => Some(event), + } + } +} diff --git a/src/test/fixtures/event/stream.rs b/src/test/fixtures/event/stream.rs new file mode 100644 index 0000000..6c2a1bf --- /dev/null +++ b/src/test/fixtures/event/stream.rs @@ -0,0 +1,62 @@ +use std::future::{self, Ready}; + +use crate::{event::Event, test::fixtures::event}; + +pub fn channel(event: Event) -> Ready> { + future::ready(event::channel(event)) +} + +pub fn message(event: Event) -> Ready> { + future::ready(event::message(event)) +} + +pub fn user(event: Event) -> Ready> { + future::ready(event::user(event)) +} + +pub mod channel { + use std::future::{self, Ready}; + + use crate::{ + channel::{Event, event}, + test::fixtures::event::channel, + }; + + pub fn created(event: Event) -> Ready> { + future::ready(channel::created(event)) + } + + pub fn deleted(event: Event) -> Ready> { + future::ready(channel::deleted(event)) + } +} + +pub mod message { + use std::future::{self, Ready}; + + use crate::{ + message::{Event, event}, + test::fixtures::event::message, + }; + + pub fn sent(event: Event) -> Ready> { + future::ready(message::sent(event)) + } + + pub fn deleted(event: Event) -> future::Ready> { + future::ready(message::deleted(event)) + } +} + +pub mod user { + use std::future::{self, Ready}; + + use crate::{ + test::fixtures::event::user, + user::{Event, event}, + }; + + pub fn created(event: Event) -> Ready> { + future::ready(user::created(event)) + } +} -- cgit v1.2.3 From 4b522c804db8155f74a734c95ed962d996b2c692 Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Wed, 18 Jun 2025 23:33:02 -0400 Subject: Include historical events in the boot response. The returned events are all events up to and including the `resume_point` in the same response. If combined with the events from `/api/events?resume_point=x`, using the same `resume_point`, the client will have a complete event history, less any events from histories that have been purged. --- docs/api/boot.md | 58 ++++++++++++++++++- src/boot/app.rs | 50 +++++++++++++---- src/boot/handlers/boot/test.rs | 125 ++++++++++++++++++++++++++++++++++++++++- src/boot/mod.rs | 5 +- 4 files changed, 221 insertions(+), 17 deletions(-) diff --git a/docs/api/boot.md b/docs/api/boot.md index 7e3dda3..1f6e619 100644 --- a/docs/api/boot.md +++ b/docs/api/boot.md @@ -21,8 +21,8 @@ sequenceDiagram Client initialization serves three purposes: - It confirms that the client's [identity token](./authentication.md) is valid, and tells the client what user that token is associated with. -- It provides an initial snapshot of the state of the service. -- It provides a resume point for the [event stream](./events.md), which allows clients to consume events starting from the moment the snapshot was created. +- It provides an initial event collection. +- It provides a resume point for the [event stream](./events.md), which allows clients to consume events starting after the initial event collection. ## `GET /api/boot` @@ -43,6 +43,56 @@ This endpoint will respond with a status of }, "resume_point": 1312, "heartbeat": 30, + "events": [ + { + "type": "user", + "event": "created", + "at": "2025-04-14T23:58:10.421901Z", + "id": "U1234abcd", + "name": "example username" + }, + { + "type": "channel", + "event": "created", + "at": "2025-04-14T23:58:11.421901Z", + "id": "C1234abcd", + "name": "nonsense and such" + }, + { + "type": "message", + "event": "sent", + "at": "2024-09-27T23:19:10.208147Z", + "channel": "C1234abcd", + "sender": "U1234abcd", + "id": "M1312acab", + "body": "beep" + }, + { + "type": "message", + "event": "sent", + "at": "2025-06-19T15:14:40.431627Z", + "channel": "Ccfdryfdb4krpy77", + "sender": "U888j6fyc8ccrnkf", + "id": "Mc6jk823wjc82734", + "body": "test" + }, + { + "type": "channel", + "event": "created", + "at": "2025-06-19T15:14:44.764263Z", + "id": "C2d9y6wckph3n36x", + "name": "noob" + }, + { + "type": "message", + "event": "sent", + "at": "2025-06-19T15:29:47.376455Z", + "channel": "Ccfdryfdb4krpy77", + "sender": "U888j6fyc8ccrnkf", + "id": "M3twnj7rfk2ph744", + "body": "test" + } + ], "users": [ { "id": "U1234abcd", @@ -75,10 +125,14 @@ The response will include the following fields: | `user` | object | The details of the caller's identity. | | `resume_point` | integer | A resume point for [events](./events.md), such that the event stream will begin immediately after the included snapshot. | | `heartbeat` | integer | The [heartbeat timeout](./events.md#heartbeat-events), in seconds, for events. | +| `events` | array of object | The events on the server up to the resume point. | | `users` | array of object | A snapshot of the users present in the service. | | `channels` | array of object | A snapshot of the channels present in the service. | | `messages` | array of object | A snapshot of the messages present in the service. | +Each element of the +`events` object is an event, as described in [Events](./events.md). Events are provided in the same order as they would appear in the event stream response. + The `user` object will include the following fields: | Field | Type | Description | diff --git a/src/boot/app.rs b/src/boot/app.rs index f531afe..690bcf4 100644 --- a/src/boot/app.rs +++ b/src/boot/app.rs @@ -1,10 +1,11 @@ +use itertools::Itertools as _; use sqlx::sqlite::SqlitePool; use super::Snapshot; use crate::{ channel::{self, repo::Provider as _}, - event::repo::Provider as _, - message::repo::Provider as _, + event::{Event, Sequence, repo::Provider as _}, + message::{self, repo::Provider as _}, name, user::{self, repo::Provider as _}, }; @@ -22,32 +23,59 @@ impl<'a> Boot<'a> { let mut tx = self.db.begin().await?; let resume_point = tx.sequence().current().await?; - let users = tx.users().all(resume_point).await?; - let channels = tx.channels().all(resume_point).await?; - let messages = tx.messages().all(resume_point).await?; + let user_histories = tx.users().all(resume_point).await?; + let channel_histories = tx.channels().all(resume_point).await?; + let message_histories = tx.messages().all(resume_point).await?; tx.commit().await?; - let users = users - .into_iter() + let users = user_histories + .iter() .filter_map(|user| user.as_of(resume_point)) .collect(); - let channels = channels - .into_iter() + let channels = channel_histories + .iter() .filter_map(|channel| channel.as_of(resume_point)) .collect(); - let messages = messages - .into_iter() + let messages = message_histories + .iter() .filter_map(|message| message.as_of(resume_point)) .collect(); + let user_events = user_histories + .iter() + .map(user::History::events) + .kmerge_by(Sequence::merge) + .filter(Sequence::up_to(resume_point)) + .map(Event::from); + + let channel_events = channel_histories + .iter() + .map(channel::History::events) + .kmerge_by(Sequence::merge) + .filter(Sequence::up_to(resume_point)) + .map(Event::from); + + let message_events = message_histories + .iter() + .map(message::History::events) + .kmerge_by(Sequence::merge) + .filter(Sequence::up_to(resume_point)) + .map(Event::from); + + let events = user_events + .merge_by(channel_events, Sequence::merge) + .merge_by(message_events, Sequence::merge) + .collect(); + Ok(Snapshot { resume_point, users, channels, messages, + events, }) } } diff --git a/src/boot/handlers/boot/test.rs b/src/boot/handlers/boot/test.rs index 0a7622b..d68618e 100644 --- a/src/boot/handlers/boot/test.rs +++ b/src/boot/handlers/boot/test.rs @@ -1,4 +1,5 @@ use axum::extract::State; +use itertools::Itertools as _; use crate::test::fixtures; @@ -15,7 +16,7 @@ async fn returns_identity() { } #[tokio::test] -async fn includes_logins() { +async fn includes_users() { let app = fixtures::scratch_app().await; let spectator = fixtures::user::create(&app, &fixtures::now()).await; @@ -25,6 +26,16 @@ async fn includes_logins() { .expect("boot always succeeds"); assert!(response.snapshot.users.contains(&spectator)); + + let created = response + .snapshot + .events + .into_iter() + .filter_map(fixtures::event::user) + .filter_map(fixtures::event::user::created) + .exactly_one() + .expect("only one user has been created"); + assert_eq!(spectator, created.user) } #[tokio::test] @@ -38,6 +49,16 @@ async fn includes_channels() { .expect("boot always succeeds"); assert!(response.snapshot.channels.contains(&channel)); + + let created = response + .snapshot + .events + .into_iter() + .filter_map(fixtures::event::channel) + .filter_map(fixtures::event::channel::created) + .exactly_one() + .expect("only one channel has been created"); + assert_eq!(channel, created.channel); } #[tokio::test] @@ -53,6 +74,16 @@ async fn includes_messages() { .expect("boot always succeeds"); assert!(response.snapshot.messages.contains(&message)); + + let sent = response + .snapshot + .events + .into_iter() + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::sent) + .exactly_one() + .expect("only one message has been sent"); + assert_eq!(message, sent.message); } #[tokio::test] @@ -74,6 +105,29 @@ async fn excludes_expired_messages() { .expect("boot always succeeds"); assert!(!response.snapshot.messages.contains(&expired_message)); + + let sent = response + .snapshot + .events + .iter() + .cloned() + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::sent) + .exactly_one() + .expect("only one message has been sent"); + // We don't expect `expired_message` to match the event exactly, as the body will have been + // tombstoned and the message given a `deleted_at` date. + assert_eq!(expired_message.id, sent.message.id); + + let deleted = response + .snapshot + .events + .into_iter() + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::deleted) + .exactly_one() + .expect("only one message has expired"); + assert_eq!(expired_message.id, deleted.id); } #[tokio::test] @@ -94,6 +148,29 @@ async fn excludes_deleted_messages() { .expect("boot always succeeds"); assert!(!response.snapshot.messages.contains(&deleted_message)); + + let sent = response + .snapshot + .events + .iter() + .cloned() + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::sent) + .exactly_one() + .expect("only one message has been sent"); + // We don't expect `deleted_message` to match the event exactly, as the body will have been + // tombstoned and the message given a `deleted_at` date. + assert_eq!(deleted_message.id, sent.message.id); + + let deleted = response + .snapshot + .events + .into_iter() + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::deleted) + .exactly_one() + .expect("only one message has been deleted"); + assert_eq!(deleted_message.id, deleted.id); } #[tokio::test] @@ -112,6 +189,29 @@ async fn excludes_expired_channels() { .expect("boot always succeeds"); assert!(!response.snapshot.channels.contains(&expired_channel)); + + let created = response + .snapshot + .events + .iter() + .cloned() + .filter_map(fixtures::event::channel) + .filter_map(fixtures::event::channel::created) + .exactly_one() + .expect("only one channel has been created"); + // We don't expect `expired_channel` to match the event exactly, as the name will have been + // tombstoned and the channel given a `deleted_at` date. + assert_eq!(expired_channel.id, created.channel.id); + + let deleted = response + .snapshot + .events + .into_iter() + .filter_map(fixtures::event::channel) + .filter_map(fixtures::event::channel::deleted) + .exactly_one() + .expect("only one channel has expired"); + assert_eq!(expired_channel.id, deleted.id); } #[tokio::test] @@ -130,4 +230,27 @@ async fn excludes_deleted_channels() { .expect("boot always succeeds"); assert!(!response.snapshot.channels.contains(&deleted_channel)); + + let created = response + .snapshot + .events + .iter() + .cloned() + .filter_map(fixtures::event::channel) + .filter_map(fixtures::event::channel::created) + .exactly_one() + .expect("only one channel has been created"); + // We don't expect `deleted_channel` to match the event exactly, as the name will have been + // tombstoned and the channel given a `deleted_at` date. + assert_eq!(deleted_channel.id, created.channel.id); + + let deleted = response + .snapshot + .events + .into_iter() + .filter_map(fixtures::event::channel) + .filter_map(fixtures::event::channel::deleted) + .exactly_one() + .expect("only one channel has been deleted"); + assert_eq!(deleted_channel.id, deleted.id); } diff --git a/src/boot/mod.rs b/src/boot/mod.rs index 3fc2c9e..148e87d 100644 --- a/src/boot/mod.rs +++ b/src/boot/mod.rs @@ -1,6 +1,4 @@ -use serde::Serialize; - -use crate::{channel::Channel, event::Sequence, message::Message, user::User}; +use crate::{channel::Channel, event::Event, event::Sequence, message::Message, user::User}; pub mod app; pub mod handlers; @@ -11,4 +9,5 @@ pub struct Snapshot { pub users: Vec, pub channels: Vec, pub messages: Vec, + pub events: Vec, } -- cgit v1.2.3 From 639f4b422adb0a6fc809161dd816d8382cf88138 Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Thu, 19 Jun 2025 11:33:17 -0400 Subject: Boot the client by consuming events. We use the same event processing glue that the client has for keeping up with live events, which means that a significant chunk of state management code goes away entirely. --- ui/lib/session.svelte.js | 12 ++++-------- ui/lib/state/remote/channels.svelte.js | 9 --------- ui/lib/state/remote/messages.svelte.js | 9 --------- ui/lib/state/remote/state.svelte.js | 22 ++++++++++------------ ui/lib/state/remote/users.svelte.js | 11 +---------- 5 files changed, 15 insertions(+), 48 deletions(-) diff --git a/ui/lib/session.svelte.js b/ui/lib/session.svelte.js index d742dbe..838401c 100644 --- a/ui/lib/session.svelte.js +++ b/ui/lib/session.svelte.js @@ -64,27 +64,23 @@ class Session { ), ); - static boot({ user, users, channels, messages, resume_point, heartbeat }) { + static boot({ user, resume_point, heartbeat, events }) { const remote = r.State.boot({ currentUser: user, - users, - channels, - messages, resumePoint: resume_point, heartbeat, + events, }); const local = l.Channels.fromLocalStorage(); return new Session(remote, local); } - reboot({ user, users, channels, messages, resume_point, heartbeat }) { + reboot({ user, resume_point, heartbeat, events }) { this.remote = r.State.boot({ currentUser: user, - users, - channels, - messages, resumePoint: resume_point, heartbeat, + events, }); } diff --git a/ui/lib/state/remote/channels.svelte.js b/ui/lib/state/remote/channels.svelte.js index b2888cb..1e40075 100644 --- a/ui/lib/state/remote/channels.svelte.js +++ b/ui/lib/state/remote/channels.svelte.js @@ -19,15 +19,6 @@ class Channel { export class Channels { all = $state([]); - static boot(channels) { - const all = channels.map((channel) => Channel.boot(channel)); - return new Channels({ all }); - } - - constructor({ all }) { - this.all = all; - } - add({ at, id, name }) { this.all.push(Channel.boot({ at, id, name })); } diff --git a/ui/lib/state/remote/messages.svelte.js b/ui/lib/state/remote/messages.svelte.js index 7ce28b4..1be001b 100644 --- a/ui/lib/state/remote/messages.svelte.js +++ b/ui/lib/state/remote/messages.svelte.js @@ -26,15 +26,6 @@ class Message { export class Messages { all = $state([]); - static boot(messages) { - const all = messages.map(Message.boot); - return new Messages({ all }); - } - - constructor({ all }) { - this.all = all; - } - add({ id, at, channel, sender, body }) { const message = Message.boot({ id, at, channel, sender, body }); this.all.push(message); diff --git a/ui/lib/state/remote/state.svelte.js b/ui/lib/state/remote/state.svelte.js index a30e0fe..fb46489 100644 --- a/ui/lib/state/remote/state.svelte.js +++ b/ui/lib/state/remote/state.svelte.js @@ -4,27 +4,25 @@ import { Messages } from './messages.svelte.js'; export class State { currentUser = $state(); - users = $state(); - channels = $state(); - messages = $state(); + users = $state(new Users()); + channels = $state(new Channels()); + messages = $state(new Messages()); - static boot({ currentUser, heartbeat, users, channels, messages, resumePoint }) { - return new State({ + static boot({ currentUser, heartbeat, resumePoint, events }) { + const state = new State({ currentUser: User.boot(currentUser), heartbeat, - users: Users.boot(users), - channels: Channels.boot(channels), - messages: Messages.boot(messages), resumePoint, }); + for (const event of events) { + state.onEvent(event); + } + return state; } - constructor({ currentUser, heartbeat, users, channels, messages, resumePoint }) { + constructor({ currentUser, heartbeat, resumePoint }) { this.currentUser = currentUser; this.heartbeat = heartbeat; - this.users = users; - this.channels = channels; - this.messages = messages; this.resumePoint = resumePoint; } diff --git a/ui/lib/state/remote/users.svelte.js b/ui/lib/state/remote/users.svelte.js index a15d1da..391ab3a 100644 --- a/ui/lib/state/remote/users.svelte.js +++ b/ui/lib/state/remote/users.svelte.js @@ -16,16 +16,7 @@ export class User { } export class Users { - all = $state(); - - static boot(users) { - const all = new SvelteMap(users.map((user) => [user.id, User.boot(user)])); - return new Users({ all }); - } - - constructor({ all }) { - this.all = all; - } + all = $state(new SvelteMap()); add({ id, name }) { this.all.set(id, new User(id, name)); -- cgit v1.2.3 From 7778cdf0c495a04f4f5f3f85b78348c8037a5771 Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Fri, 20 Jun 2025 19:47:46 -0400 Subject: Remove the snapshot fields from `/api/boot`. Clients now _must_ construct their state from the event stream; it is no longer possible for them to delegate that work to the server. --- docs/api/boot.md | 50 ------------------------------------- src/boot/app.rs | 30 +++++----------------- src/boot/handlers/boot/test.rs | 22 +++------------- src/boot/mod.rs | 5 +--- src/channel/handlers/create/test.rs | 10 +++++++- src/channel/handlers/delete/test.rs | 12 ++++++++- src/channel/history.rs | 6 ----- src/message/handlers/delete/test.rs | 10 +++++++- src/message/history.rs | 6 ----- src/user/history.rs | 8 +----- 10 files changed, 41 insertions(+), 118 deletions(-) diff --git a/docs/api/boot.md b/docs/api/boot.md index 1f6e619..f6e6dc2 100644 --- a/docs/api/boot.md +++ b/docs/api/boot.md @@ -92,28 +92,6 @@ This endpoint will respond with a status of "id": "M3twnj7rfk2ph744", "body": "test" } - ], - "users": [ - { - "id": "U1234abcd", - "name": "example username" - } - ], - "channels": [ - { - "at": "2025-04-14T23:58:11.421901Z", - "name": "nonsense and such", - "id": "C1234abcd" - } - ], - "messages": [ - { - "at": "2024-09-27T23:19:10.208147Z", - "channel": "C1234abcd", - "sender": "U1234abcd", - "id": "M1312acab", - "body": "beep" - } ] } ``` @@ -126,9 +104,6 @@ The response will include the following fields: | `resume_point` | integer | A resume point for [events](./events.md), such that the event stream will begin immediately after the included snapshot. | | `heartbeat` | integer | The [heartbeat timeout](./events.md#heartbeat-events), in seconds, for events. | | `events` | array of object | The events on the server up to the resume point. | -| `users` | array of object | A snapshot of the users present in the service. | -| `channels` | array of object | A snapshot of the channels present in the service. | -| `messages` | array of object | A snapshot of the messages present in the service. | Each element of the `events` object is an event, as described in [Events](./events.md). Events are provided in the same order as they would appear in the event stream response. @@ -139,28 +114,3 @@ The `user` object will include the following fields: | :----- | :----- | :--------------------------------------- | | `name` | string | The name of the caller's login identity. | | `id` | string | The ID of the caller's login identity. | - -Each element of the `users` array describes a distinct user, and will include the following fields: - -| Field | Type | Description | -| :----- | :----- | :----------------------------------------------------------------------------------------------------------------------------------- | -| `name` | string | The name for the user. | -| `id` | string | A unique identifier for the user. This can be used to associate the user with other events, or to make API calls targeting the user. | - -Each element of the `channels` array describes a distinct channel, and will include the following fields: - -| Field | Type | Description | -| :----- | :-------- | :-------------------------------------------------------------------------------------------------------------------------------------------- | -| `at` | timestamp | The moment the channel was created. | -| `name` | string | The name for the channel. | -| `id` | string | A unique identifier for the channel. This can be used to associate the channel with other events, or to make API calls targeting the channel. | - -Each element of the `messages` array describes a distinct message, and will include the following fields: - -| Field | Type | Description | -| :-------- | :-------- | :-------------------------------------------------------------------------------------------------------------------------------------------- | -| `at` | timestamp | The moment the message was sent. | -| `channel` | string | The ID of the channel the message was sent to. | -| `sender` | string | The ID of the user that sent the message. | -| `id` | string | A unique identifier for the message. This can be used to associate the message with other events, or to make API calls targeting the message. | -| `body` | string | The text of the message. | diff --git a/src/boot/app.rs b/src/boot/app.rs index 690bcf4..89eec12 100644 --- a/src/boot/app.rs +++ b/src/boot/app.rs @@ -23,42 +23,27 @@ impl<'a> Boot<'a> { let mut tx = self.db.begin().await?; let resume_point = tx.sequence().current().await?; - let user_histories = tx.users().all(resume_point).await?; - let channel_histories = tx.channels().all(resume_point).await?; - let message_histories = tx.messages().all(resume_point).await?; + let users = tx.users().all(resume_point).await?; + let channels = tx.channels().all(resume_point).await?; + let messages = tx.messages().all(resume_point).await?; tx.commit().await?; - let users = user_histories - .iter() - .filter_map(|user| user.as_of(resume_point)) - .collect(); - - let channels = channel_histories - .iter() - .filter_map(|channel| channel.as_of(resume_point)) - .collect(); - - let messages = message_histories - .iter() - .filter_map(|message| message.as_of(resume_point)) - .collect(); - - let user_events = user_histories + let user_events = users .iter() .map(user::History::events) .kmerge_by(Sequence::merge) .filter(Sequence::up_to(resume_point)) .map(Event::from); - let channel_events = channel_histories + let channel_events = channels .iter() .map(channel::History::events) .kmerge_by(Sequence::merge) .filter(Sequence::up_to(resume_point)) .map(Event::from); - let message_events = message_histories + let message_events = messages .iter() .map(message::History::events) .kmerge_by(Sequence::merge) @@ -72,9 +57,6 @@ impl<'a> Boot<'a> { Ok(Snapshot { resume_point, - users, - channels, - messages, events, }) } diff --git a/src/boot/handlers/boot/test.rs b/src/boot/handlers/boot/test.rs index d68618e..1e590a7 100644 --- a/src/boot/handlers/boot/test.rs +++ b/src/boot/handlers/boot/test.rs @@ -25,8 +25,6 @@ async fn includes_users() { .await .expect("boot always succeeds"); - assert!(response.snapshot.users.contains(&spectator)); - let created = response .snapshot .events @@ -48,8 +46,6 @@ async fn includes_channels() { .await .expect("boot always succeeds"); - assert!(response.snapshot.channels.contains(&channel)); - let created = response .snapshot .events @@ -73,8 +69,6 @@ async fn includes_messages() { .await .expect("boot always succeeds"); - assert!(response.snapshot.messages.contains(&message)); - let sent = response .snapshot .events @@ -87,7 +81,7 @@ async fn includes_messages() { } #[tokio::test] -async fn excludes_expired_messages() { +async fn includes_expired_messages() { let app = fixtures::scratch_app().await; let sender = fixtures::user::create(&app, &fixtures::ancient()).await; let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; @@ -104,8 +98,6 @@ async fn excludes_expired_messages() { .await .expect("boot always succeeds"); - assert!(!response.snapshot.messages.contains(&expired_message)); - let sent = response .snapshot .events @@ -131,7 +123,7 @@ async fn excludes_expired_messages() { } #[tokio::test] -async fn excludes_deleted_messages() { +async fn includes_deleted_messages() { let app = fixtures::scratch_app().await; let sender = fixtures::user::create(&app, &fixtures::now()).await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; @@ -147,8 +139,6 @@ async fn excludes_deleted_messages() { .await .expect("boot always succeeds"); - assert!(!response.snapshot.messages.contains(&deleted_message)); - let sent = response .snapshot .events @@ -174,7 +164,7 @@ async fn excludes_deleted_messages() { } #[tokio::test] -async fn excludes_expired_channels() { +async fn includes_expired_channels() { let app = fixtures::scratch_app().await; let expired_channel = fixtures::channel::create(&app, &fixtures::ancient()).await; @@ -188,8 +178,6 @@ async fn excludes_expired_channels() { .await .expect("boot always succeeds"); - assert!(!response.snapshot.channels.contains(&expired_channel)); - let created = response .snapshot .events @@ -215,7 +203,7 @@ async fn excludes_expired_channels() { } #[tokio::test] -async fn excludes_deleted_channels() { +async fn includes_deleted_channels() { let app = fixtures::scratch_app().await; let deleted_channel = fixtures::channel::create(&app, &fixtures::now()).await; @@ -229,8 +217,6 @@ async fn excludes_deleted_channels() { .await .expect("boot always succeeds"); - assert!(!response.snapshot.channels.contains(&deleted_channel)); - let created = response .snapshot .events diff --git a/src/boot/mod.rs b/src/boot/mod.rs index 148e87d..e0d35d9 100644 --- a/src/boot/mod.rs +++ b/src/boot/mod.rs @@ -1,4 +1,4 @@ -use crate::{channel::Channel, event::Event, event::Sequence, message::Message, user::User}; +use crate::{event::Event, event::Sequence}; pub mod app; pub mod handlers; @@ -6,8 +6,5 @@ pub mod handlers; #[derive(serde::Serialize)] pub struct Snapshot { pub resume_point: Sequence, - pub users: Vec, - pub channels: Vec, - pub messages: Vec, pub events: Vec, } diff --git a/src/channel/handlers/create/test.rs b/src/channel/handlers/create/test.rs index 595a879..31bb778 100644 --- a/src/channel/handlers/create/test.rs +++ b/src/channel/handlers/create/test.rs @@ -2,6 +2,7 @@ use std::future; use axum::extract::{Json, State}; use futures::stream::StreamExt as _; +use itertools::Itertools; use crate::{ channel::app, @@ -33,7 +34,14 @@ async fn new_channel() { // Verify the semantics let snapshot = app.boot().snapshot().await.expect("boot always succeeds"); - assert!(snapshot.channels.iter().any(|channel| channel == &response)); + let created = snapshot + .events + .into_iter() + .filter_map(fixtures::event::channel) + .filter_map(fixtures::event::channel::created) + .exactly_one() + .expect("only one channel has been created"); + assert_eq!(response, created.channel); let channel = app .channels() diff --git a/src/channel/handlers/delete/test.rs b/src/channel/handlers/delete/test.rs index b1e42ea..99c19db 100644 --- a/src/channel/handlers/delete/test.rs +++ b/src/channel/handlers/delete/test.rs @@ -1,4 +1,5 @@ use axum::extract::{Path, State}; +use itertools::Itertools; use crate::{channel::app, test::fixtures}; @@ -28,7 +29,16 @@ pub async fn valid_channel() { // Verify the semantics let snapshot = app.boot().snapshot().await.expect("boot always succeeds"); - assert!(!snapshot.channels.contains(&channel)); + let created = snapshot + .events + .into_iter() + .filter_map(fixtures::event::channel) + .filter_map(fixtures::event::channel::created) + .exactly_one() + .expect("only one channel has been created"); + // We don't expect `channel` to match the event exactly, as the name will have been + // tombstoned and the channel given a `deleted_at` date. + assert_eq!(channel.id, created.channel.id); } #[tokio::test] diff --git a/src/channel/history.rs b/src/channel/history.rs index faf6a0e..7f18e45 100644 --- a/src/channel/history.rs +++ b/src/channel/history.rs @@ -27,12 +27,6 @@ impl History { self.channel.clone() } - pub fn as_of(&self, resume_point: Sequence) -> Option { - self.events() - .filter(Sequence::up_to(resume_point)) - .collect() - } - // Snapshot of this channel as of all events recorded in this history. pub fn as_snapshot(&self) -> Option { self.events().collect() diff --git a/src/message/handlers/delete/test.rs b/src/message/handlers/delete/test.rs index 15aa2c2..f567eb7 100644 --- a/src/message/handlers/delete/test.rs +++ b/src/message/handlers/delete/test.rs @@ -1,4 +1,5 @@ use axum::extract::{Path, State}; +use itertools::Itertools; use crate::{message::app, test::fixtures}; @@ -29,7 +30,14 @@ pub async fn delete_message() { // Verify the semantics let snapshot = app.boot().snapshot().await.expect("boot always succeeds"); - assert!(!snapshot.messages.contains(&message)); + let deleted = snapshot + .events + .into_iter() + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::deleted) + .exactly_one() + .expect("only one message has been deleted"); + assert_eq!(response.id, deleted.id) } #[tokio::test] diff --git a/src/message/history.rs b/src/message/history.rs index 1a72c08..7585e1c 100644 --- a/src/message/history.rs +++ b/src/message/history.rs @@ -27,12 +27,6 @@ impl History { self.message.clone() } - pub fn as_of(&self, resume_point: Sequence) -> Option { - self.events() - .filter(Sequence::up_to(resume_point)) - .collect() - } - // Snapshot of this message as of all events recorded in this history. pub fn as_snapshot(&self) -> Option { self.events().collect() diff --git a/src/user/history.rs b/src/user/history.rs index ae7a561..72e0aee 100644 --- a/src/user/history.rs +++ b/src/user/history.rs @@ -2,7 +2,7 @@ use super::{ Id, User, event::{Created, Event}, }; -use crate::event::{Instant, Sequence}; +use crate::event::Instant; #[derive(Clone, Debug, Eq, PartialEq)] pub struct History { @@ -24,12 +24,6 @@ impl History { self.user.clone() } - pub fn as_of(&self, resume_point: Sequence) -> Option { - self.events() - .filter(Sequence::up_to(resume_point)) - .collect() - } - // Snapshot of this user, as of all events recorded in this history. pub fn as_snapshot(&self) -> Option { self.events().collect() -- cgit v1.2.3