diff options
26 files changed, 450 insertions, 294 deletions
diff --git a/docs/api/boot.md b/docs/api/boot.md index 7e3dda3..f6e6dc2 100644 --- a/docs/api/boot.md +++ b/docs/api/boot.md @@ -21,8 +21,8 @@ sequenceDiagram Client initialization serves three purposes: - It confirms that the client's [identity token](./authentication.md) is valid, and tells the client what user that token is associated with. -- It provides an initial snapshot of the state of the service. -- It provides a resume point for the [event stream](./events.md), which allows clients to consume events starting from the moment the snapshot was created. +- It provides an initial event collection. +- It provides a resume point for the [event stream](./events.md), which allows clients to consume events starting after the initial event collection. ## `GET /api/boot` @@ -43,26 +43,54 @@ This endpoint will respond with a status of }, "resume_point": 1312, "heartbeat": 30, - "users": [ + "events": [ { + "type": "user", + "event": "created", + "at": "2025-04-14T23:58:10.421901Z", "id": "U1234abcd", "name": "example username" - } - ], - "channels": [ + }, { + "type": "channel", + "event": "created", "at": "2025-04-14T23:58:11.421901Z", - "name": "nonsense and such", - "id": "C1234abcd" - } - ], - "messages": [ + "id": "C1234abcd", + "name": "nonsense and such" + }, { + "type": "message", + "event": "sent", "at": "2024-09-27T23:19:10.208147Z", "channel": "C1234abcd", "sender": "U1234abcd", "id": "M1312acab", "body": "beep" + }, + { + "type": "message", + "event": "sent", + "at": "2025-06-19T15:14:40.431627Z", + "channel": "Ccfdryfdb4krpy77", + "sender": "U888j6fyc8ccrnkf", + "id": "Mc6jk823wjc82734", + "body": "test" + }, + { + "type": "channel", + "event": "created", + "at": "2025-06-19T15:14:44.764263Z", + "id": "C2d9y6wckph3n36x", + "name": "noob" + }, + { + "type": "message", + "event": "sent", + "at": "2025-06-19T15:29:47.376455Z", + "channel": "Ccfdryfdb4krpy77", + "sender": "U888j6fyc8ccrnkf", + "id": "M3twnj7rfk2ph744", + "body": "test" } ] } @@ -75,9 +103,10 @@ The response will include the following fields: | `user` | object | The details of the caller's identity. | | `resume_point` | integer | A resume point for [events](./events.md), such that the event stream will begin immediately after the included snapshot. | | `heartbeat` | integer | The [heartbeat timeout](./events.md#heartbeat-events), in seconds, for events. | -| `users` | array of object | A snapshot of the users present in the service. | -| `channels` | array of object | A snapshot of the channels present in the service. | -| `messages` | array of object | A snapshot of the messages present in the service. | +| `events` | array of object | The events on the server up to the resume point. | + +Each element of the +`events` object is an event, as described in [Events](./events.md). Events are provided in the same order as they would appear in the event stream response. The `user` object will include the following fields: @@ -85,28 +114,3 @@ The `user` object will include the following fields: | :----- | :----- | :--------------------------------------- | | `name` | string | The name of the caller's login identity. | | `id` | string | The ID of the caller's login identity. | - -Each element of the `users` array describes a distinct user, and will include the following fields: - -| Field | Type | Description | -| :----- | :----- | :----------------------------------------------------------------------------------------------------------------------------------- | -| `name` | string | The name for the user. | -| `id` | string | A unique identifier for the user. This can be used to associate the user with other events, or to make API calls targeting the user. | - -Each element of the `channels` array describes a distinct channel, and will include the following fields: - -| Field | Type | Description | -| :----- | :-------- | :-------------------------------------------------------------------------------------------------------------------------------------------- | -| `at` | timestamp | The moment the channel was created. | -| `name` | string | The name for the channel. | -| `id` | string | A unique identifier for the channel. This can be used to associate the channel with other events, or to make API calls targeting the channel. | - -Each element of the `messages` array describes a distinct message, and will include the following fields: - -| Field | Type | Description | -| :-------- | :-------- | :-------------------------------------------------------------------------------------------------------------------------------------------- | -| `at` | timestamp | The moment the message was sent. | -| `channel` | string | The ID of the channel the message was sent to. | -| `sender` | string | The ID of the user that sent the message. | -| `id` | string | A unique identifier for the message. This can be used to associate the message with other events, or to make API calls targeting the message. | -| `body` | string | The text of the message. | diff --git a/src/boot/app.rs b/src/boot/app.rs index cd45c38..89eec12 100644 --- a/src/boot/app.rs +++ b/src/boot/app.rs @@ -1,10 +1,11 @@ +use itertools::Itertools as _; use sqlx::sqlite::SqlitePool; use super::Snapshot; use crate::{ channel::{self, repo::Provider as _}, - event::{Heartbeat, repo::Provider as _}, - message::repo::Provider as _, + event::{Event, Sequence, repo::Provider as _}, + message::{self, repo::Provider as _}, name, user::{self, repo::Provider as _}, }; @@ -21,7 +22,6 @@ impl<'a> Boot<'a> { pub async fn snapshot(&self) -> Result<Snapshot, Error> { let mut tx = self.db.begin().await?; let resume_point = tx.sequence().current().await?; - let heartbeat = Heartbeat::TIMEOUT; let users = tx.users().all(resume_point).await?; let channels = tx.channels().all(resume_point).await?; @@ -29,27 +29,35 @@ impl<'a> Boot<'a> { tx.commit().await?; - let users = users - .into_iter() - .filter_map(|user| user.as_of(resume_point)) - .collect(); + let user_events = users + .iter() + .map(user::History::events) + .kmerge_by(Sequence::merge) + .filter(Sequence::up_to(resume_point)) + .map(Event::from); - let channels = channels - .into_iter() - .filter_map(|channel| channel.as_of(resume_point)) - .collect(); + let channel_events = channels + .iter() + .map(channel::History::events) + .kmerge_by(Sequence::merge) + .filter(Sequence::up_to(resume_point)) + .map(Event::from); + + let message_events = messages + .iter() + .map(message::History::events) + .kmerge_by(Sequence::merge) + .filter(Sequence::up_to(resume_point)) + .map(Event::from); - let messages = messages - .into_iter() - .filter_map(|message| message.as_of(resume_point)) + let events = user_events + .merge_by(channel_events, Sequence::merge) + .merge_by(message_events, Sequence::merge) .collect(); Ok(Snapshot { resume_point, - heartbeat, - users, - channels, - messages, + events, }) } } diff --git a/src/boot/handlers/boot/mod.rs b/src/boot/handlers/boot/mod.rs index 010f57b..49691f7 100644 --- a/src/boot/handlers/boot/mod.rs +++ b/src/boot/handlers/boot/mod.rs @@ -1,17 +1,26 @@ +use std::time::Duration; + use axum::{ extract::{Json, State}, response::{self, IntoResponse}, }; +use serde::Serialize; -use crate::{app::App, boot::Snapshot, error::Internal, token::extract::Identity, user::User}; +use crate::{ + app::App, boot::Snapshot, error::Internal, event::Heartbeat, token::extract::Identity, + user::User, +}; #[cfg(test)] mod test; pub async fn handler(State(app): State<App>, identity: Identity) -> Result<Response, Internal> { let snapshot = app.boot().snapshot().await?; + let heartbeat = Heartbeat::TIMEOUT; + Ok(Response { user: identity.user, + heartbeat, snapshot, }) } @@ -19,6 +28,8 @@ pub async fn handler(State(app): State<App>, identity: Identity) -> Result<Respo #[derive(serde::Serialize)] pub struct Response { pub user: User, + #[serde(serialize_with = "as_seconds")] + pub heartbeat: Duration, #[serde(flatten)] pub snapshot: Snapshot, } @@ -28,3 +39,10 @@ impl IntoResponse for Response { Json(self).into_response() } } + +fn as_seconds<S>(duration: &Duration, serializer: S) -> Result<S::Ok, S::Error> +where + S: serde::Serializer, +{ + duration.as_secs().serialize(serializer) +} diff --git a/src/boot/handlers/boot/test.rs b/src/boot/handlers/boot/test.rs index 0a7622b..1e590a7 100644 --- a/src/boot/handlers/boot/test.rs +++ b/src/boot/handlers/boot/test.rs @@ -1,4 +1,5 @@ use axum::extract::State; +use itertools::Itertools as _; use crate::test::fixtures; @@ -15,7 +16,7 @@ async fn returns_identity() { } #[tokio::test] -async fn includes_logins() { +async fn includes_users() { let app = fixtures::scratch_app().await; let spectator = fixtures::user::create(&app, &fixtures::now()).await; @@ -24,7 +25,15 @@ async fn includes_logins() { .await .expect("boot always succeeds"); - assert!(response.snapshot.users.contains(&spectator)); + let created = response + .snapshot + .events + .into_iter() + .filter_map(fixtures::event::user) + .filter_map(fixtures::event::user::created) + .exactly_one() + .expect("only one user has been created"); + assert_eq!(spectator, created.user) } #[tokio::test] @@ -37,7 +46,15 @@ async fn includes_channels() { .await .expect("boot always succeeds"); - assert!(response.snapshot.channels.contains(&channel)); + let created = response + .snapshot + .events + .into_iter() + .filter_map(fixtures::event::channel) + .filter_map(fixtures::event::channel::created) + .exactly_one() + .expect("only one channel has been created"); + assert_eq!(channel, created.channel); } #[tokio::test] @@ -52,11 +69,19 @@ async fn includes_messages() { .await .expect("boot always succeeds"); - assert!(response.snapshot.messages.contains(&message)); + let sent = response + .snapshot + .events + .into_iter() + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::sent) + .exactly_one() + .expect("only one message has been sent"); + assert_eq!(message, sent.message); } #[tokio::test] -async fn excludes_expired_messages() { +async fn includes_expired_messages() { let app = fixtures::scratch_app().await; let sender = fixtures::user::create(&app, &fixtures::ancient()).await; let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; @@ -73,11 +98,32 @@ async fn excludes_expired_messages() { .await .expect("boot always succeeds"); - assert!(!response.snapshot.messages.contains(&expired_message)); + let sent = response + .snapshot + .events + .iter() + .cloned() + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::sent) + .exactly_one() + .expect("only one message has been sent"); + // We don't expect `expired_message` to match the event exactly, as the body will have been + // tombstoned and the message given a `deleted_at` date. + assert_eq!(expired_message.id, sent.message.id); + + let deleted = response + .snapshot + .events + .into_iter() + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::deleted) + .exactly_one() + .expect("only one message has expired"); + assert_eq!(expired_message.id, deleted.id); } #[tokio::test] -async fn excludes_deleted_messages() { +async fn includes_deleted_messages() { let app = fixtures::scratch_app().await; let sender = fixtures::user::create(&app, &fixtures::now()).await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; @@ -93,11 +139,32 @@ async fn excludes_deleted_messages() { .await .expect("boot always succeeds"); - assert!(!response.snapshot.messages.contains(&deleted_message)); + let sent = response + .snapshot + .events + .iter() + .cloned() + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::sent) + .exactly_one() + .expect("only one message has been sent"); + // We don't expect `deleted_message` to match the event exactly, as the body will have been + // tombstoned and the message given a `deleted_at` date. + assert_eq!(deleted_message.id, sent.message.id); + + let deleted = response + .snapshot + .events + .into_iter() + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::deleted) + .exactly_one() + .expect("only one message has been deleted"); + assert_eq!(deleted_message.id, deleted.id); } #[tokio::test] -async fn excludes_expired_channels() { +async fn includes_expired_channels() { let app = fixtures::scratch_app().await; let expired_channel = fixtures::channel::create(&app, &fixtures::ancient()).await; @@ -111,11 +178,32 @@ async fn excludes_expired_channels() { .await .expect("boot always succeeds"); - assert!(!response.snapshot.channels.contains(&expired_channel)); + let created = response + .snapshot + .events + .iter() + .cloned() + .filter_map(fixtures::event::channel) + .filter_map(fixtures::event::channel::created) + .exactly_one() + .expect("only one channel has been created"); + // We don't expect `expired_channel` to match the event exactly, as the name will have been + // tombstoned and the channel given a `deleted_at` date. + assert_eq!(expired_channel.id, created.channel.id); + + let deleted = response + .snapshot + .events + .into_iter() + .filter_map(fixtures::event::channel) + .filter_map(fixtures::event::channel::deleted) + .exactly_one() + .expect("only one channel has expired"); + assert_eq!(expired_channel.id, deleted.id); } #[tokio::test] -async fn excludes_deleted_channels() { +async fn includes_deleted_channels() { let app = fixtures::scratch_app().await; let deleted_channel = fixtures::channel::create(&app, &fixtures::now()).await; @@ -129,5 +217,26 @@ async fn excludes_deleted_channels() { .await .expect("boot always succeeds"); - assert!(!response.snapshot.channels.contains(&deleted_channel)); + let created = response + .snapshot + .events + .iter() + .cloned() + .filter_map(fixtures::event::channel) + .filter_map(fixtures::event::channel::created) + .exactly_one() + .expect("only one channel has been created"); + // We don't expect `deleted_channel` to match the event exactly, as the name will have been + // tombstoned and the channel given a `deleted_at` date. + assert_eq!(deleted_channel.id, created.channel.id); + + let deleted = response + .snapshot + .events + .into_iter() + .filter_map(fixtures::event::channel) + .filter_map(fixtures::event::channel::deleted) + .exactly_one() + .expect("only one channel has been deleted"); + assert_eq!(deleted_channel.id, deleted.id); } diff --git a/src/boot/mod.rs b/src/boot/mod.rs index 48da4f0..e0d35d9 100644 --- a/src/boot/mod.rs +++ b/src/boot/mod.rs @@ -1,8 +1,4 @@ -use std::time::Duration; - -use serde::Serialize; - -use crate::{channel::Channel, event::Sequence, message::Message, user::User}; +use crate::{event::Event, event::Sequence}; pub mod app; pub mod handlers; @@ -10,16 +6,5 @@ pub mod handlers; #[derive(serde::Serialize)] pub struct Snapshot { pub resume_point: Sequence, - #[serde(serialize_with = "as_seconds")] - pub heartbeat: Duration, - pub users: Vec<User>, - pub channels: Vec<Channel>, - pub messages: Vec<Message>, -} - -fn as_seconds<S>(duration: &Duration, serializer: S) -> Result<S::Ok, S::Error> -where - S: serde::Serializer, -{ - duration.as_secs().serialize(serializer) + pub events: Vec<Event>, } diff --git a/src/channel/handlers/create/test.rs b/src/channel/handlers/create/test.rs index 3c770cf..31bb778 100644 --- a/src/channel/handlers/create/test.rs +++ b/src/channel/handlers/create/test.rs @@ -2,6 +2,7 @@ use std::future; use axum::extract::{Json, State}; use futures::stream::StreamExt as _; +use itertools::Itertools; use crate::{ channel::app, @@ -33,7 +34,14 @@ async fn new_channel() { // Verify the semantics let snapshot = app.boot().snapshot().await.expect("boot always succeeds"); - assert!(snapshot.channels.iter().any(|channel| channel == &response)); + let created = snapshot + .events + .into_iter() + .filter_map(fixtures::event::channel) + .filter_map(fixtures::event::channel::created) + .exactly_one() + .expect("only one channel has been created"); + assert_eq!(response, created.channel); let channel = app .channels() @@ -47,8 +55,8 @@ async fn new_channel() { .subscribe(resume_point) .await .expect("subscribing never fails") - .filter_map(fixtures::event::channel) - .filter_map(fixtures::event::channel::created) + .filter_map(fixtures::event::stream::channel) + .filter_map(fixtures::event::stream::channel::created) .filter(|event| future::ready(event.channel == response)); let event = events.next().expect_some("creation event published").await; diff --git a/src/channel/handlers/delete/test.rs b/src/channel/handlers/delete/test.rs index b1e42ea..99c19db 100644 --- a/src/channel/handlers/delete/test.rs +++ b/src/channel/handlers/delete/test.rs @@ -1,4 +1,5 @@ use axum::extract::{Path, State}; +use itertools::Itertools; use crate::{channel::app, test::fixtures}; @@ -28,7 +29,16 @@ pub async fn valid_channel() { // Verify the semantics let snapshot = app.boot().snapshot().await.expect("boot always succeeds"); - assert!(!snapshot.channels.contains(&channel)); + let created = snapshot + .events + .into_iter() + .filter_map(fixtures::event::channel) + .filter_map(fixtures::event::channel::created) + .exactly_one() + .expect("only one channel has been created"); + // We don't expect `channel` to match the event exactly, as the name will have been + // tombstoned and the channel given a `deleted_at` date. + assert_eq!(channel.id, created.channel.id); } #[tokio::test] diff --git a/src/channel/handlers/send/test.rs b/src/channel/handlers/send/test.rs index f43f901..7204ca4 100644 --- a/src/channel/handlers/send/test.rs +++ b/src/channel/handlers/send/test.rs @@ -45,8 +45,8 @@ async fn messages_in_order() { .subscribe(resume_point) .await .expect("subscribing to a valid channel succeeds") - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::sent) + .filter_map(fixtures::event::stream::message) + .filter_map(fixtures::event::stream::message::sent) .zip(stream::iter(requests)); while let Some((event, (sent_at, body))) = events diff --git a/src/channel/history.rs b/src/channel/history.rs index faf6a0e..7f18e45 100644 --- a/src/channel/history.rs +++ b/src/channel/history.rs @@ -27,12 +27,6 @@ impl History { self.channel.clone() } - pub fn as_of(&self, resume_point: Sequence) -> Option<Channel> { - self.events() - .filter(Sequence::up_to(resume_point)) - .collect() - } - // Snapshot of this channel as of all events recorded in this history. pub fn as_snapshot(&self) -> Option<Channel> { self.events().collect() diff --git a/src/event/handlers/stream/test/channel.rs b/src/event/handlers/stream/test/channel.rs index 187c3c3..2b87ce2 100644 --- a/src/event/handlers/stream/test/channel.rs +++ b/src/event/handlers/stream/test/channel.rs @@ -35,8 +35,8 @@ async fn creating() { // Verify channel created event events - .filter_map(fixtures::event::channel) - .filter_map(fixtures::event::channel::created) + .filter_map(fixtures::event::stream::channel) + .filter_map(fixtures::event::stream::channel::created) .filter(|event| future::ready(event.channel == channel)) .next() .expect_some("channel created event is delivered") @@ -74,8 +74,8 @@ async fn previously_created() { // Verify channel created event let _ = events - .filter_map(fixtures::event::channel) - .filter_map(fixtures::event::channel::created) + .filter_map(fixtures::event::stream::channel) + .filter_map(fixtures::event::stream::channel::created) .filter(|event| future::ready(event.channel == channel)) .next() .expect_some("channel created event is delivered") @@ -111,8 +111,8 @@ async fn expiring() { // Check for expiry event let _ = events - .filter_map(fixtures::event::channel) - .filter_map(fixtures::event::channel::deleted) + .filter_map(fixtures::event::stream::channel) + .filter_map(fixtures::event::stream::channel::deleted) .filter(|event| future::ready(event.id == channel.id)) .next() .expect_some("a deleted channel event will be delivered") @@ -148,8 +148,8 @@ async fn previously_expired() { // Check for expiry event let _ = events - .filter_map(fixtures::event::channel) - .filter_map(fixtures::event::channel::deleted) + .filter_map(fixtures::event::stream::channel) + .filter_map(fixtures::event::stream::channel::deleted) .filter(|event| future::ready(event.id == channel.id)) .next() .expect_some("a deleted channel event will be delivered") @@ -185,8 +185,8 @@ async fn deleting() { // Check for delete event let _ = events - .filter_map(fixtures::event::channel) - .filter_map(fixtures::event::channel::deleted) + .filter_map(fixtures::event::stream::channel) + .filter_map(fixtures::event::stream::channel::deleted) .filter(|event| future::ready(event.id == channel.id)) .next() .expect_some("a deleted channel event will be delivered") @@ -222,8 +222,8 @@ async fn previously_deleted() { // Check for expiry event let _ = events - .filter_map(fixtures::event::channel) - .filter_map(fixtures::event::channel::deleted) + .filter_map(fixtures::event::stream::channel) + .filter_map(fixtures::event::stream::channel::deleted) .filter(|event| future::ready(event.id == channel.id)) .next() .expect_some("a deleted channel event will be delivered") @@ -264,8 +264,8 @@ async fn previously_purged() { // Check for expiry event events - .filter_map(fixtures::event::channel) - .filter_map(fixtures::event::channel::deleted) + .filter_map(fixtures::event::stream::channel) + .filter_map(fixtures::event::stream::channel::deleted) .filter(|event| future::ready(event.id == channel.id)) .next() .expect_wait("deleted channel events not delivered") diff --git a/src/event/handlers/stream/test/invite.rs b/src/event/handlers/stream/test/invite.rs index c8e12fb..01372ce 100644 --- a/src/event/handlers/stream/test/invite.rs +++ b/src/event/handlers/stream/test/invite.rs @@ -37,8 +37,8 @@ async fn accepting_invite() { // Expect a login created event let _ = events - .filter_map(fixtures::event::user) - .filter_map(fixtures::event::user::created) + .filter_map(fixtures::event::stream::user) + .filter_map(fixtures::event::stream::user::created) .filter(|event| future::ready(event.user == joiner)) .next() .expect_some("a login created event is sent") @@ -78,8 +78,8 @@ async fn previously_accepted_invite() { // Expect a login created event let _ = events - .filter_map(fixtures::event::user) - .filter_map(fixtures::event::user::created) + .filter_map(fixtures::event::stream::user) + .filter_map(fixtures::event::stream::user::created) .filter(|event| future::ready(event.user == joiner)) .next() .expect_some("a login created event is sent") diff --git a/src/event/handlers/stream/test/message.rs b/src/event/handlers/stream/test/message.rs index a80c896..4369996 100644 --- a/src/event/handlers/stream/test/message.rs +++ b/src/event/handlers/stream/test/message.rs @@ -44,8 +44,8 @@ async fn sending() { // Verify that an event is delivered let _ = events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::sent) + .filter_map(fixtures::event::stream::message) + .filter_map(fixtures::event::stream::message::sent) .filter(|event| future::ready(event.message == message)) .next() .expect_some("delivered message sent event") @@ -89,8 +89,8 @@ async fn previously_sent() { // Verify that an event is delivered let _ = events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::sent) + .filter_map(fixtures::event::stream::message) + .filter_map(fixtures::event::stream::message::sent) .filter(|event| future::ready(event.message == message)) .next() .expect_some("delivered message sent event") @@ -135,8 +135,8 @@ async fn sent_in_multiple_channels() { // Verify the structure of the response. let events = events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::sent) + .filter_map(fixtures::event::stream::message) + .filter_map(fixtures::event::stream::message::sent) .take(messages.len()) .collect::<Vec<_>>() .expect_ready("events ready") @@ -177,8 +177,8 @@ async fn sent_sequentially() { // Verify the expected events in the expected order let mut events = events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::sent) + .filter_map(fixtures::event::stream::message) + .filter_map(fixtures::event::stream::message::sent) .filter(|event| future::ready(messages.iter().any(|message| &event.message == message))); for message in &messages { @@ -222,8 +222,8 @@ async fn expiring() { // Check for expiry event let _ = events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::deleted) + .filter_map(fixtures::event::stream::message) + .filter_map(fixtures::event::stream::message::deleted) .filter(|event| future::ready(event.id == message.id)) .next() .expect_some("a deleted message event will be delivered") @@ -261,8 +261,8 @@ async fn previously_expired() { // Check for expiry event let _ = events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::deleted) + .filter_map(fixtures::event::stream::message) + .filter_map(fixtures::event::stream::message::deleted) .filter(|event| future::ready(event.id == message.id)) .next() .expect_some("a deleted message event will be delivered") @@ -300,8 +300,8 @@ async fn deleting() { // Check for delete event let _ = events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::deleted) + .filter_map(fixtures::event::stream::message) + .filter_map(fixtures::event::stream::message::deleted) .filter(|event| future::ready(event.id == message.id)) .next() .expect_some("a deleted message event will be delivered") @@ -339,8 +339,8 @@ async fn previously_deleted() { // Check for delete event let _ = events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::deleted) + .filter_map(fixtures::event::stream::message) + .filter_map(fixtures::event::stream::message::deleted) .filter(|event| future::ready(event.id == message.id)) .next() .expect_some("a deleted message event will be delivered") @@ -384,8 +384,8 @@ async fn previously_purged() { // Check for delete event events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::deleted) + .filter_map(fixtures::event::stream::message) + .filter_map(fixtures::event::stream::message::deleted) .filter(|event| future::ready(event.id == message.id)) .next() .expect_wait("no deleted message will be delivered") diff --git a/src/event/handlers/stream/test/resume.rs b/src/event/handlers/stream/test/resume.rs index 34fee4d..835d350 100644 --- a/src/event/handlers/stream/test/resume.rs +++ b/src/event/handlers/stream/test/resume.rs @@ -41,8 +41,8 @@ async fn resume() { .expect("subscribe never fails"); let event = events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::sent) + .filter_map(fixtures::event::stream::message) + .filter_map(fixtures::event::stream::message::sent) .filter(|event| future::ready(event.message == initial_message)) .next() .expect_some("delivered event for initial message") @@ -64,8 +64,8 @@ async fn resume() { // Verify final events let mut events = resumed - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::sent) + .filter_map(fixtures::event::stream::message) + .filter_map(fixtures::event::stream::message::sent) .zip(stream::iter(later_messages)); while let Some((event, message)) = events.next().expect_ready("event ready").await { @@ -125,8 +125,8 @@ async fn serial_resume() { // Check for expected events let events = events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::sent) + .filter_map(fixtures::event::stream::message) + .filter_map(fixtures::event::stream::message::sent) .zip(stream::iter(initial_messages)) .collect::<Vec<_>>() .expect_ready("zipping a finite list of events is ready immediately") @@ -168,8 +168,8 @@ async fn serial_resume() { // Check for expected events let events = events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::sent) + .filter_map(fixtures::event::stream::message) + .filter_map(fixtures::event::stream::message::sent) .zip(stream::iter(resume_messages)) .collect::<Vec<_>>() .expect_ready("zipping a finite list of events is ready immediately") @@ -211,8 +211,8 @@ async fn serial_resume() { // Check for expected events let events = events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::sent) + .filter_map(fixtures::event::stream::message) + .filter_map(fixtures::event::stream::message::sent) .zip(stream::iter(final_messages)) .collect::<Vec<_>>() .expect_ready("zipping a finite list of events is ready immediately") diff --git a/src/event/handlers/stream/test/setup.rs b/src/event/handlers/stream/test/setup.rs index 5335055..992b962 100644 --- a/src/event/handlers/stream/test/setup.rs +++ b/src/event/handlers/stream/test/setup.rs @@ -38,8 +38,8 @@ async fn previously_completed() { // Expect a login created event let _ = events - .filter_map(fixtures::event::user) - .filter_map(fixtures::event::user::created) + .filter_map(fixtures::event::stream::user) + .filter_map(fixtures::event::stream::user::created) .filter(|event| future::ready(event.user == owner)) .next() .expect_some("a login created event is sent") diff --git a/src/event/handlers/stream/test/token.rs b/src/event/handlers/stream/test/token.rs index 2008323..e32b489 100644 --- a/src/event/handlers/stream/test/token.rs +++ b/src/event/handlers/stream/test/token.rs @@ -43,8 +43,8 @@ async fn terminates_on_token_expiry() { ]; events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::sent) + .filter_map(fixtures::event::stream::message) + .filter_map(fixtures::event::stream::message::sent) .filter(|event| future::ready(messages.iter().any(|message| &event.message == message))) .next() .expect_none("end of stream") @@ -89,8 +89,8 @@ async fn terminates_on_logout() { ]; events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::sent) + .filter_map(fixtures::event::stream::message) + .filter_map(fixtures::event::stream::message::sent) .filter(|event| future::ready(messages.iter().any(|message| &event.message == message))) .next() .expect_none("end of stream") @@ -139,8 +139,8 @@ async fn terminates_on_password_change() { ]; events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::sent) + .filter_map(fixtures::event::stream::message) + .filter_map(fixtures::event::stream::message::sent) .filter(|event| future::ready(messages.iter().any(|message| &event.message == message))) .next() .expect_none("end of stream") diff --git a/src/message/handlers/delete/test.rs b/src/message/handlers/delete/test.rs index 15aa2c2..f567eb7 100644 --- a/src/message/handlers/delete/test.rs +++ b/src/message/handlers/delete/test.rs @@ -1,4 +1,5 @@ use axum::extract::{Path, State}; +use itertools::Itertools; use crate::{message::app, test::fixtures}; @@ -29,7 +30,14 @@ pub async fn delete_message() { // Verify the semantics let snapshot = app.boot().snapshot().await.expect("boot always succeeds"); - assert!(!snapshot.messages.contains(&message)); + let deleted = snapshot + .events + .into_iter() + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::deleted) + .exactly_one() + .expect("only one message has been deleted"); + assert_eq!(response.id, deleted.id) } #[tokio::test] diff --git a/src/message/history.rs b/src/message/history.rs index 1a72c08..7585e1c 100644 --- a/src/message/history.rs +++ b/src/message/history.rs @@ -27,12 +27,6 @@ impl History { self.message.clone() } - pub fn as_of(&self, resume_point: Sequence) -> Option<Message> { - self.events() - .filter(Sequence::up_to(resume_point)) - .collect() - } - // Snapshot of this message as of all events recorded in this history. pub fn as_snapshot(&self) -> Option<Message> { self.events().collect() diff --git a/src/test/fixtures/event.rs b/src/test/fixtures/event.rs deleted file mode 100644 index a30bb4b..0000000 --- a/src/test/fixtures/event.rs +++ /dev/null @@ -1,79 +0,0 @@ -use std::future::{self, Ready}; - -use crate::event::Event; - -pub fn channel(event: Event) -> Ready<Option<channel::Event>> { - future::ready(match event { - Event::Channel(channel) => Some(channel), - _ => None, - }) -} - -pub fn message(event: Event) -> Ready<Option<message::Event>> { - future::ready(match event { - Event::Message(event) => Some(event), - _ => None, - }) -} - -pub fn user(event: Event) -> Ready<Option<user::Event>> { - future::ready(match event { - Event::User(event) => Some(event), - _ => None, - }) -} - -pub mod channel { - use std::future::{self, Ready}; - - pub use crate::channel::Event; - use crate::channel::event; - - pub fn created(event: Event) -> Ready<Option<event::Created>> { - future::ready(match event { - Event::Created(event) => Some(event), - Event::Deleted(_) => None, - }) - } - - pub fn deleted(event: Event) -> Ready<Option<event::Deleted>> { - future::ready(match event { - Event::Deleted(event) => Some(event), - Event::Created(_) => None, - }) - } -} - -pub mod message { - use std::future::{self, Ready}; - - pub use crate::message::Event; - use crate::message::event; - - pub fn sent(event: Event) -> Ready<Option<event::Sent>> { - future::ready(match event { - Event::Sent(event) => Some(event), - Event::Deleted(_) => None, - }) - } - - pub fn deleted(event: Event) -> future::Ready<Option<event::Deleted>> { - future::ready(match event { - Event::Deleted(event) => Some(event), - Event::Sent(_) => None, - }) - } -} - -pub mod user { - use std::future::{self, Ready}; - - pub use crate::user::Event; - use crate::user::event; - - pub fn created(event: Event) -> Ready<Option<event::Created>> { - future::ready(match event { - Event::Created(event) => Some(event), - }) - } -} diff --git a/src/test/fixtures/event/mod.rs b/src/test/fixtures/event/mod.rs new file mode 100644 index 0000000..691cdeb --- /dev/null +++ b/src/test/fixtures/event/mod.rs @@ -0,0 +1,74 @@ +use crate::event::Event; + +pub mod stream; + +pub fn channel(event: Event) -> Option<crate::channel::Event> { + match event { + Event::Channel(channel) => Some(channel), + _ => None, + } +} + +pub fn message(event: Event) -> Option<crate::message::Event> { + match event { + Event::Message(event) => Some(event), + _ => None, + } +} + +pub fn user(event: Event) -> Option<crate::user::Event> { + match event { + Event::User(event) => Some(event), + _ => None, + } +} + +pub mod channel { + use crate::channel::{Event, event}; + + pub fn created(event: Event) -> Option<event::Created> { + match event { + Event::Created(event) => Some(event), + Event::Deleted(_) => None, + } + } + + pub fn deleted(event: Event) -> Option<event::Deleted> { + match event { + Event::Deleted(event) => Some(event), + Event::Created(_) => None, + } + } +} + +pub mod message { + use crate::message::{Event, event}; + + pub fn sent(event: Event) -> Option<event::Sent> { + match event { + Event::Sent(event) => Some(event), + Event::Deleted(_) => None, + } + } + + pub fn deleted(event: Event) -> Option<event::Deleted> { + match event { + Event::Deleted(event) => Some(event), + Event::Sent(_) => None, + } + } +} + +pub mod user { + use crate::user::{Event, event}; + + // This could be defined as `-> event::Created`. However, I want the interface to be consistent + // with the event stream transformers for other types, and we'd have to refactor the return type + // to `-> Option<event::Created>` the instant users sprout a second event, anyways. + #[allow(clippy::unnecessary_wraps)] + pub fn created(event: Event) -> Option<event::Created> { + match event { + Event::Created(event) => Some(event), + } + } +} diff --git a/src/test/fixtures/event/stream.rs b/src/test/fixtures/event/stream.rs new file mode 100644 index 0000000..6c2a1bf --- /dev/null +++ b/src/test/fixtures/event/stream.rs @@ -0,0 +1,62 @@ +use std::future::{self, Ready}; + +use crate::{event::Event, test::fixtures::event}; + +pub fn channel(event: Event) -> Ready<Option<crate::channel::Event>> { + future::ready(event::channel(event)) +} + +pub fn message(event: Event) -> Ready<Option<crate::message::Event>> { + future::ready(event::message(event)) +} + +pub fn user(event: Event) -> Ready<Option<crate::user::Event>> { + future::ready(event::user(event)) +} + +pub mod channel { + use std::future::{self, Ready}; + + use crate::{ + channel::{Event, event}, + test::fixtures::event::channel, + }; + + pub fn created(event: Event) -> Ready<Option<event::Created>> { + future::ready(channel::created(event)) + } + + pub fn deleted(event: Event) -> Ready<Option<event::Deleted>> { + future::ready(channel::deleted(event)) + } +} + +pub mod message { + use std::future::{self, Ready}; + + use crate::{ + message::{Event, event}, + test::fixtures::event::message, + }; + + pub fn sent(event: Event) -> Ready<Option<event::Sent>> { + future::ready(message::sent(event)) + } + + pub fn deleted(event: Event) -> future::Ready<Option<event::Deleted>> { + future::ready(message::deleted(event)) + } +} + +pub mod user { + use std::future::{self, Ready}; + + use crate::{ + test::fixtures::event::user, + user::{Event, event}, + }; + + pub fn created(event: Event) -> Ready<Option<event::Created>> { + future::ready(user::created(event)) + } +} diff --git a/src/user/history.rs b/src/user/history.rs index ae7a561..72e0aee 100644 --- a/src/user/history.rs +++ b/src/user/history.rs @@ -2,7 +2,7 @@ use super::{ Id, User, event::{Created, Event}, }; -use crate::event::{Instant, Sequence}; +use crate::event::Instant; #[derive(Clone, Debug, Eq, PartialEq)] pub struct History { @@ -24,12 +24,6 @@ impl History { self.user.clone() } - pub fn as_of(&self, resume_point: Sequence) -> Option<User> { - self.events() - .filter(Sequence::up_to(resume_point)) - .collect() - } - // Snapshot of this user, as of all events recorded in this history. pub fn as_snapshot(&self) -> Option<User> { self.events().collect() diff --git a/ui/lib/session.svelte.js b/ui/lib/session.svelte.js index d742dbe..838401c 100644 --- a/ui/lib/session.svelte.js +++ b/ui/lib/session.svelte.js @@ -64,27 +64,23 @@ class Session { ), ); - static boot({ user, users, channels, messages, resume_point, heartbeat }) { + static boot({ user, resume_point, heartbeat, events }) { const remote = r.State.boot({ currentUser: user, - users, - channels, - messages, resumePoint: resume_point, heartbeat, + events, }); const local = l.Channels.fromLocalStorage(); return new Session(remote, local); } - reboot({ user, users, channels, messages, resume_point, heartbeat }) { + reboot({ user, resume_point, heartbeat, events }) { this.remote = r.State.boot({ currentUser: user, - users, - channels, - messages, resumePoint: resume_point, heartbeat, + events, }); } diff --git a/ui/lib/state/remote/channels.svelte.js b/ui/lib/state/remote/channels.svelte.js index b2888cb..1e40075 100644 --- a/ui/lib/state/remote/channels.svelte.js +++ b/ui/lib/state/remote/channels.svelte.js @@ -19,15 +19,6 @@ class Channel { export class Channels { all = $state([]); - static boot(channels) { - const all = channels.map((channel) => Channel.boot(channel)); - return new Channels({ all }); - } - - constructor({ all }) { - this.all = all; - } - add({ at, id, name }) { this.all.push(Channel.boot({ at, id, name })); } diff --git a/ui/lib/state/remote/messages.svelte.js b/ui/lib/state/remote/messages.svelte.js index 7ce28b4..1be001b 100644 --- a/ui/lib/state/remote/messages.svelte.js +++ b/ui/lib/state/remote/messages.svelte.js @@ -26,15 +26,6 @@ class Message { export class Messages { all = $state([]); - static boot(messages) { - const all = messages.map(Message.boot); - return new Messages({ all }); - } - - constructor({ all }) { - this.all = all; - } - add({ id, at, channel, sender, body }) { const message = Message.boot({ id, at, channel, sender, body }); this.all.push(message); diff --git a/ui/lib/state/remote/state.svelte.js b/ui/lib/state/remote/state.svelte.js index a30e0fe..fb46489 100644 --- a/ui/lib/state/remote/state.svelte.js +++ b/ui/lib/state/remote/state.svelte.js @@ -4,27 +4,25 @@ import { Messages } from './messages.svelte.js'; export class State { currentUser = $state(); - users = $state(); - channels = $state(); - messages = $state(); + users = $state(new Users()); + channels = $state(new Channels()); + messages = $state(new Messages()); - static boot({ currentUser, heartbeat, users, channels, messages, resumePoint }) { - return new State({ + static boot({ currentUser, heartbeat, resumePoint, events }) { + const state = new State({ currentUser: User.boot(currentUser), heartbeat, - users: Users.boot(users), - channels: Channels.boot(channels), - messages: Messages.boot(messages), resumePoint, }); + for (const event of events) { + state.onEvent(event); + } + return state; } - constructor({ currentUser, heartbeat, users, channels, messages, resumePoint }) { + constructor({ currentUser, heartbeat, resumePoint }) { this.currentUser = currentUser; this.heartbeat = heartbeat; - this.users = users; - this.channels = channels; - this.messages = messages; this.resumePoint = resumePoint; } diff --git a/ui/lib/state/remote/users.svelte.js b/ui/lib/state/remote/users.svelte.js index a15d1da..391ab3a 100644 --- a/ui/lib/state/remote/users.svelte.js +++ b/ui/lib/state/remote/users.svelte.js @@ -16,16 +16,7 @@ export class User { } export class Users { - all = $state(); - - static boot(users) { - const all = new SvelteMap(users.map((user) => [user.id, User.boot(user)])); - return new Users({ all }); - } - - constructor({ all }) { - this.all = all; - } + all = $state(new SvelteMap()); add({ id, name }) { this.all.set(id, new User(id, name)); |
