summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/api/boot.md82
-rw-r--r--src/boot/app.rs44
-rw-r--r--src/boot/handlers/boot/mod.rs20
-rw-r--r--src/boot/handlers/boot/test.rs133
-rw-r--r--src/boot/mod.rs19
-rw-r--r--src/channel/handlers/create/test.rs14
-rw-r--r--src/channel/handlers/delete/test.rs12
-rw-r--r--src/channel/handlers/send/test.rs4
-rw-r--r--src/channel/history.rs6
-rw-r--r--src/event/handlers/stream/test/channel.rs28
-rw-r--r--src/event/handlers/stream/test/invite.rs8
-rw-r--r--src/event/handlers/stream/test/message.rs36
-rw-r--r--src/event/handlers/stream/test/resume.rs20
-rw-r--r--src/event/handlers/stream/test/setup.rs4
-rw-r--r--src/event/handlers/stream/test/token.rs12
-rw-r--r--src/message/handlers/delete/test.rs10
-rw-r--r--src/message/history.rs6
-rw-r--r--src/test/fixtures/event.rs79
-rw-r--r--src/test/fixtures/event/mod.rs74
-rw-r--r--src/test/fixtures/event/stream.rs62
-rw-r--r--src/user/history.rs8
-rw-r--r--ui/lib/session.svelte.js12
-rw-r--r--ui/lib/state/remote/channels.svelte.js9
-rw-r--r--ui/lib/state/remote/messages.svelte.js9
-rw-r--r--ui/lib/state/remote/state.svelte.js22
-rw-r--r--ui/lib/state/remote/users.svelte.js11
26 files changed, 450 insertions, 294 deletions
diff --git a/docs/api/boot.md b/docs/api/boot.md
index 7e3dda3..f6e6dc2 100644
--- a/docs/api/boot.md
+++ b/docs/api/boot.md
@@ -21,8 +21,8 @@ sequenceDiagram
Client initialization serves three purposes:
- It confirms that the client's [identity token](./authentication.md) is valid, and tells the client what user that token is associated with.
-- It provides an initial snapshot of the state of the service.
-- It provides a resume point for the [event stream](./events.md), which allows clients to consume events starting from the moment the snapshot was created.
+- It provides an initial event collection.
+- It provides a resume point for the [event stream](./events.md), which allows clients to consume events starting after the initial event collection.
## `GET /api/boot`
@@ -43,26 +43,54 @@ This endpoint will respond with a status of
},
"resume_point": 1312,
"heartbeat": 30,
- "users": [
+ "events": [
{
+ "type": "user",
+ "event": "created",
+ "at": "2025-04-14T23:58:10.421901Z",
"id": "U1234abcd",
"name": "example username"
- }
- ],
- "channels": [
+ },
{
+ "type": "channel",
+ "event": "created",
"at": "2025-04-14T23:58:11.421901Z",
- "name": "nonsense and such",
- "id": "C1234abcd"
- }
- ],
- "messages": [
+ "id": "C1234abcd",
+ "name": "nonsense and such"
+ },
{
+ "type": "message",
+ "event": "sent",
"at": "2024-09-27T23:19:10.208147Z",
"channel": "C1234abcd",
"sender": "U1234abcd",
"id": "M1312acab",
"body": "beep"
+ },
+ {
+ "type": "message",
+ "event": "sent",
+ "at": "2025-06-19T15:14:40.431627Z",
+ "channel": "Ccfdryfdb4krpy77",
+ "sender": "U888j6fyc8ccrnkf",
+ "id": "Mc6jk823wjc82734",
+ "body": "test"
+ },
+ {
+ "type": "channel",
+ "event": "created",
+ "at": "2025-06-19T15:14:44.764263Z",
+ "id": "C2d9y6wckph3n36x",
+ "name": "noob"
+ },
+ {
+ "type": "message",
+ "event": "sent",
+ "at": "2025-06-19T15:29:47.376455Z",
+ "channel": "Ccfdryfdb4krpy77",
+ "sender": "U888j6fyc8ccrnkf",
+ "id": "M3twnj7rfk2ph744",
+ "body": "test"
}
]
}
@@ -75,9 +103,10 @@ The response will include the following fields:
| `user` | object | The details of the caller's identity. |
| `resume_point` | integer | A resume point for [events](./events.md), such that the event stream will begin immediately after the included snapshot. |
| `heartbeat` | integer | The [heartbeat timeout](./events.md#heartbeat-events), in seconds, for events. |
-| `users` | array of object | A snapshot of the users present in the service. |
-| `channels` | array of object | A snapshot of the channels present in the service. |
-| `messages` | array of object | A snapshot of the messages present in the service. |
+| `events` | array of object | The events on the server up to the resume point. |
+
+Each element of the
+`events` object is an event, as described in [Events](./events.md). Events are provided in the same order as they would appear in the event stream response.
The `user` object will include the following fields:
@@ -85,28 +114,3 @@ The `user` object will include the following fields:
| :----- | :----- | :--------------------------------------- |
| `name` | string | The name of the caller's login identity. |
| `id` | string | The ID of the caller's login identity. |
-
-Each element of the `users` array describes a distinct user, and will include the following fields:
-
-| Field | Type | Description |
-| :----- | :----- | :----------------------------------------------------------------------------------------------------------------------------------- |
-| `name` | string | The name for the user. |
-| `id` | string | A unique identifier for the user. This can be used to associate the user with other events, or to make API calls targeting the user. |
-
-Each element of the `channels` array describes a distinct channel, and will include the following fields:
-
-| Field | Type | Description |
-| :----- | :-------- | :-------------------------------------------------------------------------------------------------------------------------------------------- |
-| `at` | timestamp | The moment the channel was created. |
-| `name` | string | The name for the channel. |
-| `id` | string | A unique identifier for the channel. This can be used to associate the channel with other events, or to make API calls targeting the channel. |
-
-Each element of the `messages` array describes a distinct message, and will include the following fields:
-
-| Field | Type | Description |
-| :-------- | :-------- | :-------------------------------------------------------------------------------------------------------------------------------------------- |
-| `at` | timestamp | The moment the message was sent. |
-| `channel` | string | The ID of the channel the message was sent to. |
-| `sender` | string | The ID of the user that sent the message. |
-| `id` | string | A unique identifier for the message. This can be used to associate the message with other events, or to make API calls targeting the message. |
-| `body` | string | The text of the message. |
diff --git a/src/boot/app.rs b/src/boot/app.rs
index cd45c38..89eec12 100644
--- a/src/boot/app.rs
+++ b/src/boot/app.rs
@@ -1,10 +1,11 @@
+use itertools::Itertools as _;
use sqlx::sqlite::SqlitePool;
use super::Snapshot;
use crate::{
channel::{self, repo::Provider as _},
- event::{Heartbeat, repo::Provider as _},
- message::repo::Provider as _,
+ event::{Event, Sequence, repo::Provider as _},
+ message::{self, repo::Provider as _},
name,
user::{self, repo::Provider as _},
};
@@ -21,7 +22,6 @@ impl<'a> Boot<'a> {
pub async fn snapshot(&self) -> Result<Snapshot, Error> {
let mut tx = self.db.begin().await?;
let resume_point = tx.sequence().current().await?;
- let heartbeat = Heartbeat::TIMEOUT;
let users = tx.users().all(resume_point).await?;
let channels = tx.channels().all(resume_point).await?;
@@ -29,27 +29,35 @@ impl<'a> Boot<'a> {
tx.commit().await?;
- let users = users
- .into_iter()
- .filter_map(|user| user.as_of(resume_point))
- .collect();
+ let user_events = users
+ .iter()
+ .map(user::History::events)
+ .kmerge_by(Sequence::merge)
+ .filter(Sequence::up_to(resume_point))
+ .map(Event::from);
- let channels = channels
- .into_iter()
- .filter_map(|channel| channel.as_of(resume_point))
- .collect();
+ let channel_events = channels
+ .iter()
+ .map(channel::History::events)
+ .kmerge_by(Sequence::merge)
+ .filter(Sequence::up_to(resume_point))
+ .map(Event::from);
+
+ let message_events = messages
+ .iter()
+ .map(message::History::events)
+ .kmerge_by(Sequence::merge)
+ .filter(Sequence::up_to(resume_point))
+ .map(Event::from);
- let messages = messages
- .into_iter()
- .filter_map(|message| message.as_of(resume_point))
+ let events = user_events
+ .merge_by(channel_events, Sequence::merge)
+ .merge_by(message_events, Sequence::merge)
.collect();
Ok(Snapshot {
resume_point,
- heartbeat,
- users,
- channels,
- messages,
+ events,
})
}
}
diff --git a/src/boot/handlers/boot/mod.rs b/src/boot/handlers/boot/mod.rs
index 010f57b..49691f7 100644
--- a/src/boot/handlers/boot/mod.rs
+++ b/src/boot/handlers/boot/mod.rs
@@ -1,17 +1,26 @@
+use std::time::Duration;
+
use axum::{
extract::{Json, State},
response::{self, IntoResponse},
};
+use serde::Serialize;
-use crate::{app::App, boot::Snapshot, error::Internal, token::extract::Identity, user::User};
+use crate::{
+ app::App, boot::Snapshot, error::Internal, event::Heartbeat, token::extract::Identity,
+ user::User,
+};
#[cfg(test)]
mod test;
pub async fn handler(State(app): State<App>, identity: Identity) -> Result<Response, Internal> {
let snapshot = app.boot().snapshot().await?;
+ let heartbeat = Heartbeat::TIMEOUT;
+
Ok(Response {
user: identity.user,
+ heartbeat,
snapshot,
})
}
@@ -19,6 +28,8 @@ pub async fn handler(State(app): State<App>, identity: Identity) -> Result<Respo
#[derive(serde::Serialize)]
pub struct Response {
pub user: User,
+ #[serde(serialize_with = "as_seconds")]
+ pub heartbeat: Duration,
#[serde(flatten)]
pub snapshot: Snapshot,
}
@@ -28,3 +39,10 @@ impl IntoResponse for Response {
Json(self).into_response()
}
}
+
+fn as_seconds<S>(duration: &Duration, serializer: S) -> Result<S::Ok, S::Error>
+where
+ S: serde::Serializer,
+{
+ duration.as_secs().serialize(serializer)
+}
diff --git a/src/boot/handlers/boot/test.rs b/src/boot/handlers/boot/test.rs
index 0a7622b..1e590a7 100644
--- a/src/boot/handlers/boot/test.rs
+++ b/src/boot/handlers/boot/test.rs
@@ -1,4 +1,5 @@
use axum::extract::State;
+use itertools::Itertools as _;
use crate::test::fixtures;
@@ -15,7 +16,7 @@ async fn returns_identity() {
}
#[tokio::test]
-async fn includes_logins() {
+async fn includes_users() {
let app = fixtures::scratch_app().await;
let spectator = fixtures::user::create(&app, &fixtures::now()).await;
@@ -24,7 +25,15 @@ async fn includes_logins() {
.await
.expect("boot always succeeds");
- assert!(response.snapshot.users.contains(&spectator));
+ let created = response
+ .snapshot
+ .events
+ .into_iter()
+ .filter_map(fixtures::event::user)
+ .filter_map(fixtures::event::user::created)
+ .exactly_one()
+ .expect("only one user has been created");
+ assert_eq!(spectator, created.user)
}
#[tokio::test]
@@ -37,7 +46,15 @@ async fn includes_channels() {
.await
.expect("boot always succeeds");
- assert!(response.snapshot.channels.contains(&channel));
+ let created = response
+ .snapshot
+ .events
+ .into_iter()
+ .filter_map(fixtures::event::channel)
+ .filter_map(fixtures::event::channel::created)
+ .exactly_one()
+ .expect("only one channel has been created");
+ assert_eq!(channel, created.channel);
}
#[tokio::test]
@@ -52,11 +69,19 @@ async fn includes_messages() {
.await
.expect("boot always succeeds");
- assert!(response.snapshot.messages.contains(&message));
+ let sent = response
+ .snapshot
+ .events
+ .into_iter()
+ .filter_map(fixtures::event::message)
+ .filter_map(fixtures::event::message::sent)
+ .exactly_one()
+ .expect("only one message has been sent");
+ assert_eq!(message, sent.message);
}
#[tokio::test]
-async fn excludes_expired_messages() {
+async fn includes_expired_messages() {
let app = fixtures::scratch_app().await;
let sender = fixtures::user::create(&app, &fixtures::ancient()).await;
let channel = fixtures::channel::create(&app, &fixtures::ancient()).await;
@@ -73,11 +98,32 @@ async fn excludes_expired_messages() {
.await
.expect("boot always succeeds");
- assert!(!response.snapshot.messages.contains(&expired_message));
+ let sent = response
+ .snapshot
+ .events
+ .iter()
+ .cloned()
+ .filter_map(fixtures::event::message)
+ .filter_map(fixtures::event::message::sent)
+ .exactly_one()
+ .expect("only one message has been sent");
+ // We don't expect `expired_message` to match the event exactly, as the body will have been
+ // tombstoned and the message given a `deleted_at` date.
+ assert_eq!(expired_message.id, sent.message.id);
+
+ let deleted = response
+ .snapshot
+ .events
+ .into_iter()
+ .filter_map(fixtures::event::message)
+ .filter_map(fixtures::event::message::deleted)
+ .exactly_one()
+ .expect("only one message has expired");
+ assert_eq!(expired_message.id, deleted.id);
}
#[tokio::test]
-async fn excludes_deleted_messages() {
+async fn includes_deleted_messages() {
let app = fixtures::scratch_app().await;
let sender = fixtures::user::create(&app, &fixtures::now()).await;
let channel = fixtures::channel::create(&app, &fixtures::now()).await;
@@ -93,11 +139,32 @@ async fn excludes_deleted_messages() {
.await
.expect("boot always succeeds");
- assert!(!response.snapshot.messages.contains(&deleted_message));
+ let sent = response
+ .snapshot
+ .events
+ .iter()
+ .cloned()
+ .filter_map(fixtures::event::message)
+ .filter_map(fixtures::event::message::sent)
+ .exactly_one()
+ .expect("only one message has been sent");
+ // We don't expect `deleted_message` to match the event exactly, as the body will have been
+ // tombstoned and the message given a `deleted_at` date.
+ assert_eq!(deleted_message.id, sent.message.id);
+
+ let deleted = response
+ .snapshot
+ .events
+ .into_iter()
+ .filter_map(fixtures::event::message)
+ .filter_map(fixtures::event::message::deleted)
+ .exactly_one()
+ .expect("only one message has been deleted");
+ assert_eq!(deleted_message.id, deleted.id);
}
#[tokio::test]
-async fn excludes_expired_channels() {
+async fn includes_expired_channels() {
let app = fixtures::scratch_app().await;
let expired_channel = fixtures::channel::create(&app, &fixtures::ancient()).await;
@@ -111,11 +178,32 @@ async fn excludes_expired_channels() {
.await
.expect("boot always succeeds");
- assert!(!response.snapshot.channels.contains(&expired_channel));
+ let created = response
+ .snapshot
+ .events
+ .iter()
+ .cloned()
+ .filter_map(fixtures::event::channel)
+ .filter_map(fixtures::event::channel::created)
+ .exactly_one()
+ .expect("only one channel has been created");
+ // We don't expect `expired_channel` to match the event exactly, as the name will have been
+ // tombstoned and the channel given a `deleted_at` date.
+ assert_eq!(expired_channel.id, created.channel.id);
+
+ let deleted = response
+ .snapshot
+ .events
+ .into_iter()
+ .filter_map(fixtures::event::channel)
+ .filter_map(fixtures::event::channel::deleted)
+ .exactly_one()
+ .expect("only one channel has expired");
+ assert_eq!(expired_channel.id, deleted.id);
}
#[tokio::test]
-async fn excludes_deleted_channels() {
+async fn includes_deleted_channels() {
let app = fixtures::scratch_app().await;
let deleted_channel = fixtures::channel::create(&app, &fixtures::now()).await;
@@ -129,5 +217,26 @@ async fn excludes_deleted_channels() {
.await
.expect("boot always succeeds");
- assert!(!response.snapshot.channels.contains(&deleted_channel));
+ let created = response
+ .snapshot
+ .events
+ .iter()
+ .cloned()
+ .filter_map(fixtures::event::channel)
+ .filter_map(fixtures::event::channel::created)
+ .exactly_one()
+ .expect("only one channel has been created");
+ // We don't expect `deleted_channel` to match the event exactly, as the name will have been
+ // tombstoned and the channel given a `deleted_at` date.
+ assert_eq!(deleted_channel.id, created.channel.id);
+
+ let deleted = response
+ .snapshot
+ .events
+ .into_iter()
+ .filter_map(fixtures::event::channel)
+ .filter_map(fixtures::event::channel::deleted)
+ .exactly_one()
+ .expect("only one channel has been deleted");
+ assert_eq!(deleted_channel.id, deleted.id);
}
diff --git a/src/boot/mod.rs b/src/boot/mod.rs
index 48da4f0..e0d35d9 100644
--- a/src/boot/mod.rs
+++ b/src/boot/mod.rs
@@ -1,8 +1,4 @@
-use std::time::Duration;
-
-use serde::Serialize;
-
-use crate::{channel::Channel, event::Sequence, message::Message, user::User};
+use crate::{event::Event, event::Sequence};
pub mod app;
pub mod handlers;
@@ -10,16 +6,5 @@ pub mod handlers;
#[derive(serde::Serialize)]
pub struct Snapshot {
pub resume_point: Sequence,
- #[serde(serialize_with = "as_seconds")]
- pub heartbeat: Duration,
- pub users: Vec<User>,
- pub channels: Vec<Channel>,
- pub messages: Vec<Message>,
-}
-
-fn as_seconds<S>(duration: &Duration, serializer: S) -> Result<S::Ok, S::Error>
-where
- S: serde::Serializer,
-{
- duration.as_secs().serialize(serializer)
+ pub events: Vec<Event>,
}
diff --git a/src/channel/handlers/create/test.rs b/src/channel/handlers/create/test.rs
index 3c770cf..31bb778 100644
--- a/src/channel/handlers/create/test.rs
+++ b/src/channel/handlers/create/test.rs
@@ -2,6 +2,7 @@ use std::future;
use axum::extract::{Json, State};
use futures::stream::StreamExt as _;
+use itertools::Itertools;
use crate::{
channel::app,
@@ -33,7 +34,14 @@ async fn new_channel() {
// Verify the semantics
let snapshot = app.boot().snapshot().await.expect("boot always succeeds");
- assert!(snapshot.channels.iter().any(|channel| channel == &response));
+ let created = snapshot
+ .events
+ .into_iter()
+ .filter_map(fixtures::event::channel)
+ .filter_map(fixtures::event::channel::created)
+ .exactly_one()
+ .expect("only one channel has been created");
+ assert_eq!(response, created.channel);
let channel = app
.channels()
@@ -47,8 +55,8 @@ async fn new_channel() {
.subscribe(resume_point)
.await
.expect("subscribing never fails")
- .filter_map(fixtures::event::channel)
- .filter_map(fixtures::event::channel::created)
+ .filter_map(fixtures::event::stream::channel)
+ .filter_map(fixtures::event::stream::channel::created)
.filter(|event| future::ready(event.channel == response));
let event = events.next().expect_some("creation event published").await;
diff --git a/src/channel/handlers/delete/test.rs b/src/channel/handlers/delete/test.rs
index b1e42ea..99c19db 100644
--- a/src/channel/handlers/delete/test.rs
+++ b/src/channel/handlers/delete/test.rs
@@ -1,4 +1,5 @@
use axum::extract::{Path, State};
+use itertools::Itertools;
use crate::{channel::app, test::fixtures};
@@ -28,7 +29,16 @@ pub async fn valid_channel() {
// Verify the semantics
let snapshot = app.boot().snapshot().await.expect("boot always succeeds");
- assert!(!snapshot.channels.contains(&channel));
+ let created = snapshot
+ .events
+ .into_iter()
+ .filter_map(fixtures::event::channel)
+ .filter_map(fixtures::event::channel::created)
+ .exactly_one()
+ .expect("only one channel has been created");
+ // We don't expect `channel` to match the event exactly, as the name will have been
+ // tombstoned and the channel given a `deleted_at` date.
+ assert_eq!(channel.id, created.channel.id);
}
#[tokio::test]
diff --git a/src/channel/handlers/send/test.rs b/src/channel/handlers/send/test.rs
index f43f901..7204ca4 100644
--- a/src/channel/handlers/send/test.rs
+++ b/src/channel/handlers/send/test.rs
@@ -45,8 +45,8 @@ async fn messages_in_order() {
.subscribe(resume_point)
.await
.expect("subscribing to a valid channel succeeds")
- .filter_map(fixtures::event::message)
- .filter_map(fixtures::event::message::sent)
+ .filter_map(fixtures::event::stream::message)
+ .filter_map(fixtures::event::stream::message::sent)
.zip(stream::iter(requests));
while let Some((event, (sent_at, body))) = events
diff --git a/src/channel/history.rs b/src/channel/history.rs
index faf6a0e..7f18e45 100644
--- a/src/channel/history.rs
+++ b/src/channel/history.rs
@@ -27,12 +27,6 @@ impl History {
self.channel.clone()
}
- pub fn as_of(&self, resume_point: Sequence) -> Option<Channel> {
- self.events()
- .filter(Sequence::up_to(resume_point))
- .collect()
- }
-
// Snapshot of this channel as of all events recorded in this history.
pub fn as_snapshot(&self) -> Option<Channel> {
self.events().collect()
diff --git a/src/event/handlers/stream/test/channel.rs b/src/event/handlers/stream/test/channel.rs
index 187c3c3..2b87ce2 100644
--- a/src/event/handlers/stream/test/channel.rs
+++ b/src/event/handlers/stream/test/channel.rs
@@ -35,8 +35,8 @@ async fn creating() {
// Verify channel created event
events
- .filter_map(fixtures::event::channel)
- .filter_map(fixtures::event::channel::created)
+ .filter_map(fixtures::event::stream::channel)
+ .filter_map(fixtures::event::stream::channel::created)
.filter(|event| future::ready(event.channel == channel))
.next()
.expect_some("channel created event is delivered")
@@ -74,8 +74,8 @@ async fn previously_created() {
// Verify channel created event
let _ = events
- .filter_map(fixtures::event::channel)
- .filter_map(fixtures::event::channel::created)
+ .filter_map(fixtures::event::stream::channel)
+ .filter_map(fixtures::event::stream::channel::created)
.filter(|event| future::ready(event.channel == channel))
.next()
.expect_some("channel created event is delivered")
@@ -111,8 +111,8 @@ async fn expiring() {
// Check for expiry event
let _ = events
- .filter_map(fixtures::event::channel)
- .filter_map(fixtures::event::channel::deleted)
+ .filter_map(fixtures::event::stream::channel)
+ .filter_map(fixtures::event::stream::channel::deleted)
.filter(|event| future::ready(event.id == channel.id))
.next()
.expect_some("a deleted channel event will be delivered")
@@ -148,8 +148,8 @@ async fn previously_expired() {
// Check for expiry event
let _ = events
- .filter_map(fixtures::event::channel)
- .filter_map(fixtures::event::channel::deleted)
+ .filter_map(fixtures::event::stream::channel)
+ .filter_map(fixtures::event::stream::channel::deleted)
.filter(|event| future::ready(event.id == channel.id))
.next()
.expect_some("a deleted channel event will be delivered")
@@ -185,8 +185,8 @@ async fn deleting() {
// Check for delete event
let _ = events
- .filter_map(fixtures::event::channel)
- .filter_map(fixtures::event::channel::deleted)
+ .filter_map(fixtures::event::stream::channel)
+ .filter_map(fixtures::event::stream::channel::deleted)
.filter(|event| future::ready(event.id == channel.id))
.next()
.expect_some("a deleted channel event will be delivered")
@@ -222,8 +222,8 @@ async fn previously_deleted() {
// Check for expiry event
let _ = events
- .filter_map(fixtures::event::channel)
- .filter_map(fixtures::event::channel::deleted)
+ .filter_map(fixtures::event::stream::channel)
+ .filter_map(fixtures::event::stream::channel::deleted)
.filter(|event| future::ready(event.id == channel.id))
.next()
.expect_some("a deleted channel event will be delivered")
@@ -264,8 +264,8 @@ async fn previously_purged() {
// Check for expiry event
events
- .filter_map(fixtures::event::channel)
- .filter_map(fixtures::event::channel::deleted)
+ .filter_map(fixtures::event::stream::channel)
+ .filter_map(fixtures::event::stream::channel::deleted)
.filter(|event| future::ready(event.id == channel.id))
.next()
.expect_wait("deleted channel events not delivered")
diff --git a/src/event/handlers/stream/test/invite.rs b/src/event/handlers/stream/test/invite.rs
index c8e12fb..01372ce 100644
--- a/src/event/handlers/stream/test/invite.rs
+++ b/src/event/handlers/stream/test/invite.rs
@@ -37,8 +37,8 @@ async fn accepting_invite() {
// Expect a login created event
let _ = events
- .filter_map(fixtures::event::user)
- .filter_map(fixtures::event::user::created)
+ .filter_map(fixtures::event::stream::user)
+ .filter_map(fixtures::event::stream::user::created)
.filter(|event| future::ready(event.user == joiner))
.next()
.expect_some("a login created event is sent")
@@ -78,8 +78,8 @@ async fn previously_accepted_invite() {
// Expect a login created event
let _ = events
- .filter_map(fixtures::event::user)
- .filter_map(fixtures::event::user::created)
+ .filter_map(fixtures::event::stream::user)
+ .filter_map(fixtures::event::stream::user::created)
.filter(|event| future::ready(event.user == joiner))
.next()
.expect_some("a login created event is sent")
diff --git a/src/event/handlers/stream/test/message.rs b/src/event/handlers/stream/test/message.rs
index a80c896..4369996 100644
--- a/src/event/handlers/stream/test/message.rs
+++ b/src/event/handlers/stream/test/message.rs
@@ -44,8 +44,8 @@ async fn sending() {
// Verify that an event is delivered
let _ = events
- .filter_map(fixtures::event::message)
- .filter_map(fixtures::event::message::sent)
+ .filter_map(fixtures::event::stream::message)
+ .filter_map(fixtures::event::stream::message::sent)
.filter(|event| future::ready(event.message == message))
.next()
.expect_some("delivered message sent event")
@@ -89,8 +89,8 @@ async fn previously_sent() {
// Verify that an event is delivered
let _ = events
- .filter_map(fixtures::event::message)
- .filter_map(fixtures::event::message::sent)
+ .filter_map(fixtures::event::stream::message)
+ .filter_map(fixtures::event::stream::message::sent)
.filter(|event| future::ready(event.message == message))
.next()
.expect_some("delivered message sent event")
@@ -135,8 +135,8 @@ async fn sent_in_multiple_channels() {
// Verify the structure of the response.
let events = events
- .filter_map(fixtures::event::message)
- .filter_map(fixtures::event::message::sent)
+ .filter_map(fixtures::event::stream::message)
+ .filter_map(fixtures::event::stream::message::sent)
.take(messages.len())
.collect::<Vec<_>>()
.expect_ready("events ready")
@@ -177,8 +177,8 @@ async fn sent_sequentially() {
// Verify the expected events in the expected order
let mut events = events
- .filter_map(fixtures::event::message)
- .filter_map(fixtures::event::message::sent)
+ .filter_map(fixtures::event::stream::message)
+ .filter_map(fixtures::event::stream::message::sent)
.filter(|event| future::ready(messages.iter().any(|message| &event.message == message)));
for message in &messages {
@@ -222,8 +222,8 @@ async fn expiring() {
// Check for expiry event
let _ = events
- .filter_map(fixtures::event::message)
- .filter_map(fixtures::event::message::deleted)
+ .filter_map(fixtures::event::stream::message)
+ .filter_map(fixtures::event::stream::message::deleted)
.filter(|event| future::ready(event.id == message.id))
.next()
.expect_some("a deleted message event will be delivered")
@@ -261,8 +261,8 @@ async fn previously_expired() {
// Check for expiry event
let _ = events
- .filter_map(fixtures::event::message)
- .filter_map(fixtures::event::message::deleted)
+ .filter_map(fixtures::event::stream::message)
+ .filter_map(fixtures::event::stream::message::deleted)
.filter(|event| future::ready(event.id == message.id))
.next()
.expect_some("a deleted message event will be delivered")
@@ -300,8 +300,8 @@ async fn deleting() {
// Check for delete event
let _ = events
- .filter_map(fixtures::event::message)
- .filter_map(fixtures::event::message::deleted)
+ .filter_map(fixtures::event::stream::message)
+ .filter_map(fixtures::event::stream::message::deleted)
.filter(|event| future::ready(event.id == message.id))
.next()
.expect_some("a deleted message event will be delivered")
@@ -339,8 +339,8 @@ async fn previously_deleted() {
// Check for delete event
let _ = events
- .filter_map(fixtures::event::message)
- .filter_map(fixtures::event::message::deleted)
+ .filter_map(fixtures::event::stream::message)
+ .filter_map(fixtures::event::stream::message::deleted)
.filter(|event| future::ready(event.id == message.id))
.next()
.expect_some("a deleted message event will be delivered")
@@ -384,8 +384,8 @@ async fn previously_purged() {
// Check for delete event
events
- .filter_map(fixtures::event::message)
- .filter_map(fixtures::event::message::deleted)
+ .filter_map(fixtures::event::stream::message)
+ .filter_map(fixtures::event::stream::message::deleted)
.filter(|event| future::ready(event.id == message.id))
.next()
.expect_wait("no deleted message will be delivered")
diff --git a/src/event/handlers/stream/test/resume.rs b/src/event/handlers/stream/test/resume.rs
index 34fee4d..835d350 100644
--- a/src/event/handlers/stream/test/resume.rs
+++ b/src/event/handlers/stream/test/resume.rs
@@ -41,8 +41,8 @@ async fn resume() {
.expect("subscribe never fails");
let event = events
- .filter_map(fixtures::event::message)
- .filter_map(fixtures::event::message::sent)
+ .filter_map(fixtures::event::stream::message)
+ .filter_map(fixtures::event::stream::message::sent)
.filter(|event| future::ready(event.message == initial_message))
.next()
.expect_some("delivered event for initial message")
@@ -64,8 +64,8 @@ async fn resume() {
// Verify final events
let mut events = resumed
- .filter_map(fixtures::event::message)
- .filter_map(fixtures::event::message::sent)
+ .filter_map(fixtures::event::stream::message)
+ .filter_map(fixtures::event::stream::message::sent)
.zip(stream::iter(later_messages));
while let Some((event, message)) = events.next().expect_ready("event ready").await {
@@ -125,8 +125,8 @@ async fn serial_resume() {
// Check for expected events
let events = events
- .filter_map(fixtures::event::message)
- .filter_map(fixtures::event::message::sent)
+ .filter_map(fixtures::event::stream::message)
+ .filter_map(fixtures::event::stream::message::sent)
.zip(stream::iter(initial_messages))
.collect::<Vec<_>>()
.expect_ready("zipping a finite list of events is ready immediately")
@@ -168,8 +168,8 @@ async fn serial_resume() {
// Check for expected events
let events = events
- .filter_map(fixtures::event::message)
- .filter_map(fixtures::event::message::sent)
+ .filter_map(fixtures::event::stream::message)
+ .filter_map(fixtures::event::stream::message::sent)
.zip(stream::iter(resume_messages))
.collect::<Vec<_>>()
.expect_ready("zipping a finite list of events is ready immediately")
@@ -211,8 +211,8 @@ async fn serial_resume() {
// Check for expected events
let events = events
- .filter_map(fixtures::event::message)
- .filter_map(fixtures::event::message::sent)
+ .filter_map(fixtures::event::stream::message)
+ .filter_map(fixtures::event::stream::message::sent)
.zip(stream::iter(final_messages))
.collect::<Vec<_>>()
.expect_ready("zipping a finite list of events is ready immediately")
diff --git a/src/event/handlers/stream/test/setup.rs b/src/event/handlers/stream/test/setup.rs
index 5335055..992b962 100644
--- a/src/event/handlers/stream/test/setup.rs
+++ b/src/event/handlers/stream/test/setup.rs
@@ -38,8 +38,8 @@ async fn previously_completed() {
// Expect a login created event
let _ = events
- .filter_map(fixtures::event::user)
- .filter_map(fixtures::event::user::created)
+ .filter_map(fixtures::event::stream::user)
+ .filter_map(fixtures::event::stream::user::created)
.filter(|event| future::ready(event.user == owner))
.next()
.expect_some("a login created event is sent")
diff --git a/src/event/handlers/stream/test/token.rs b/src/event/handlers/stream/test/token.rs
index 2008323..e32b489 100644
--- a/src/event/handlers/stream/test/token.rs
+++ b/src/event/handlers/stream/test/token.rs
@@ -43,8 +43,8 @@ async fn terminates_on_token_expiry() {
];
events
- .filter_map(fixtures::event::message)
- .filter_map(fixtures::event::message::sent)
+ .filter_map(fixtures::event::stream::message)
+ .filter_map(fixtures::event::stream::message::sent)
.filter(|event| future::ready(messages.iter().any(|message| &event.message == message)))
.next()
.expect_none("end of stream")
@@ -89,8 +89,8 @@ async fn terminates_on_logout() {
];
events
- .filter_map(fixtures::event::message)
- .filter_map(fixtures::event::message::sent)
+ .filter_map(fixtures::event::stream::message)
+ .filter_map(fixtures::event::stream::message::sent)
.filter(|event| future::ready(messages.iter().any(|message| &event.message == message)))
.next()
.expect_none("end of stream")
@@ -139,8 +139,8 @@ async fn terminates_on_password_change() {
];
events
- .filter_map(fixtures::event::message)
- .filter_map(fixtures::event::message::sent)
+ .filter_map(fixtures::event::stream::message)
+ .filter_map(fixtures::event::stream::message::sent)
.filter(|event| future::ready(messages.iter().any(|message| &event.message == message)))
.next()
.expect_none("end of stream")
diff --git a/src/message/handlers/delete/test.rs b/src/message/handlers/delete/test.rs
index 15aa2c2..f567eb7 100644
--- a/src/message/handlers/delete/test.rs
+++ b/src/message/handlers/delete/test.rs
@@ -1,4 +1,5 @@
use axum::extract::{Path, State};
+use itertools::Itertools;
use crate::{message::app, test::fixtures};
@@ -29,7 +30,14 @@ pub async fn delete_message() {
// Verify the semantics
let snapshot = app.boot().snapshot().await.expect("boot always succeeds");
- assert!(!snapshot.messages.contains(&message));
+ let deleted = snapshot
+ .events
+ .into_iter()
+ .filter_map(fixtures::event::message)
+ .filter_map(fixtures::event::message::deleted)
+ .exactly_one()
+ .expect("only one message has been deleted");
+ assert_eq!(response.id, deleted.id)
}
#[tokio::test]
diff --git a/src/message/history.rs b/src/message/history.rs
index 1a72c08..7585e1c 100644
--- a/src/message/history.rs
+++ b/src/message/history.rs
@@ -27,12 +27,6 @@ impl History {
self.message.clone()
}
- pub fn as_of(&self, resume_point: Sequence) -> Option<Message> {
- self.events()
- .filter(Sequence::up_to(resume_point))
- .collect()
- }
-
// Snapshot of this message as of all events recorded in this history.
pub fn as_snapshot(&self) -> Option<Message> {
self.events().collect()
diff --git a/src/test/fixtures/event.rs b/src/test/fixtures/event.rs
deleted file mode 100644
index a30bb4b..0000000
--- a/src/test/fixtures/event.rs
+++ /dev/null
@@ -1,79 +0,0 @@
-use std::future::{self, Ready};
-
-use crate::event::Event;
-
-pub fn channel(event: Event) -> Ready<Option<channel::Event>> {
- future::ready(match event {
- Event::Channel(channel) => Some(channel),
- _ => None,
- })
-}
-
-pub fn message(event: Event) -> Ready<Option<message::Event>> {
- future::ready(match event {
- Event::Message(event) => Some(event),
- _ => None,
- })
-}
-
-pub fn user(event: Event) -> Ready<Option<user::Event>> {
- future::ready(match event {
- Event::User(event) => Some(event),
- _ => None,
- })
-}
-
-pub mod channel {
- use std::future::{self, Ready};
-
- pub use crate::channel::Event;
- use crate::channel::event;
-
- pub fn created(event: Event) -> Ready<Option<event::Created>> {
- future::ready(match event {
- Event::Created(event) => Some(event),
- Event::Deleted(_) => None,
- })
- }
-
- pub fn deleted(event: Event) -> Ready<Option<event::Deleted>> {
- future::ready(match event {
- Event::Deleted(event) => Some(event),
- Event::Created(_) => None,
- })
- }
-}
-
-pub mod message {
- use std::future::{self, Ready};
-
- pub use crate::message::Event;
- use crate::message::event;
-
- pub fn sent(event: Event) -> Ready<Option<event::Sent>> {
- future::ready(match event {
- Event::Sent(event) => Some(event),
- Event::Deleted(_) => None,
- })
- }
-
- pub fn deleted(event: Event) -> future::Ready<Option<event::Deleted>> {
- future::ready(match event {
- Event::Deleted(event) => Some(event),
- Event::Sent(_) => None,
- })
- }
-}
-
-pub mod user {
- use std::future::{self, Ready};
-
- pub use crate::user::Event;
- use crate::user::event;
-
- pub fn created(event: Event) -> Ready<Option<event::Created>> {
- future::ready(match event {
- Event::Created(event) => Some(event),
- })
- }
-}
diff --git a/src/test/fixtures/event/mod.rs b/src/test/fixtures/event/mod.rs
new file mode 100644
index 0000000..691cdeb
--- /dev/null
+++ b/src/test/fixtures/event/mod.rs
@@ -0,0 +1,74 @@
+use crate::event::Event;
+
+pub mod stream;
+
+pub fn channel(event: Event) -> Option<crate::channel::Event> {
+ match event {
+ Event::Channel(channel) => Some(channel),
+ _ => None,
+ }
+}
+
+pub fn message(event: Event) -> Option<crate::message::Event> {
+ match event {
+ Event::Message(event) => Some(event),
+ _ => None,
+ }
+}
+
+pub fn user(event: Event) -> Option<crate::user::Event> {
+ match event {
+ Event::User(event) => Some(event),
+ _ => None,
+ }
+}
+
+pub mod channel {
+ use crate::channel::{Event, event};
+
+ pub fn created(event: Event) -> Option<event::Created> {
+ match event {
+ Event::Created(event) => Some(event),
+ Event::Deleted(_) => None,
+ }
+ }
+
+ pub fn deleted(event: Event) -> Option<event::Deleted> {
+ match event {
+ Event::Deleted(event) => Some(event),
+ Event::Created(_) => None,
+ }
+ }
+}
+
+pub mod message {
+ use crate::message::{Event, event};
+
+ pub fn sent(event: Event) -> Option<event::Sent> {
+ match event {
+ Event::Sent(event) => Some(event),
+ Event::Deleted(_) => None,
+ }
+ }
+
+ pub fn deleted(event: Event) -> Option<event::Deleted> {
+ match event {
+ Event::Deleted(event) => Some(event),
+ Event::Sent(_) => None,
+ }
+ }
+}
+
+pub mod user {
+ use crate::user::{Event, event};
+
+ // This could be defined as `-> event::Created`. However, I want the interface to be consistent
+ // with the event stream transformers for other types, and we'd have to refactor the return type
+ // to `-> Option<event::Created>` the instant users sprout a second event, anyways.
+ #[allow(clippy::unnecessary_wraps)]
+ pub fn created(event: Event) -> Option<event::Created> {
+ match event {
+ Event::Created(event) => Some(event),
+ }
+ }
+}
diff --git a/src/test/fixtures/event/stream.rs b/src/test/fixtures/event/stream.rs
new file mode 100644
index 0000000..6c2a1bf
--- /dev/null
+++ b/src/test/fixtures/event/stream.rs
@@ -0,0 +1,62 @@
+use std::future::{self, Ready};
+
+use crate::{event::Event, test::fixtures::event};
+
+pub fn channel(event: Event) -> Ready<Option<crate::channel::Event>> {
+ future::ready(event::channel(event))
+}
+
+pub fn message(event: Event) -> Ready<Option<crate::message::Event>> {
+ future::ready(event::message(event))
+}
+
+pub fn user(event: Event) -> Ready<Option<crate::user::Event>> {
+ future::ready(event::user(event))
+}
+
+pub mod channel {
+ use std::future::{self, Ready};
+
+ use crate::{
+ channel::{Event, event},
+ test::fixtures::event::channel,
+ };
+
+ pub fn created(event: Event) -> Ready<Option<event::Created>> {
+ future::ready(channel::created(event))
+ }
+
+ pub fn deleted(event: Event) -> Ready<Option<event::Deleted>> {
+ future::ready(channel::deleted(event))
+ }
+}
+
+pub mod message {
+ use std::future::{self, Ready};
+
+ use crate::{
+ message::{Event, event},
+ test::fixtures::event::message,
+ };
+
+ pub fn sent(event: Event) -> Ready<Option<event::Sent>> {
+ future::ready(message::sent(event))
+ }
+
+ pub fn deleted(event: Event) -> future::Ready<Option<event::Deleted>> {
+ future::ready(message::deleted(event))
+ }
+}
+
+pub mod user {
+ use std::future::{self, Ready};
+
+ use crate::{
+ test::fixtures::event::user,
+ user::{Event, event},
+ };
+
+ pub fn created(event: Event) -> Ready<Option<event::Created>> {
+ future::ready(user::created(event))
+ }
+}
diff --git a/src/user/history.rs b/src/user/history.rs
index ae7a561..72e0aee 100644
--- a/src/user/history.rs
+++ b/src/user/history.rs
@@ -2,7 +2,7 @@ use super::{
Id, User,
event::{Created, Event},
};
-use crate::event::{Instant, Sequence};
+use crate::event::Instant;
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct History {
@@ -24,12 +24,6 @@ impl History {
self.user.clone()
}
- pub fn as_of(&self, resume_point: Sequence) -> Option<User> {
- self.events()
- .filter(Sequence::up_to(resume_point))
- .collect()
- }
-
// Snapshot of this user, as of all events recorded in this history.
pub fn as_snapshot(&self) -> Option<User> {
self.events().collect()
diff --git a/ui/lib/session.svelte.js b/ui/lib/session.svelte.js
index d742dbe..838401c 100644
--- a/ui/lib/session.svelte.js
+++ b/ui/lib/session.svelte.js
@@ -64,27 +64,23 @@ class Session {
),
);
- static boot({ user, users, channels, messages, resume_point, heartbeat }) {
+ static boot({ user, resume_point, heartbeat, events }) {
const remote = r.State.boot({
currentUser: user,
- users,
- channels,
- messages,
resumePoint: resume_point,
heartbeat,
+ events,
});
const local = l.Channels.fromLocalStorage();
return new Session(remote, local);
}
- reboot({ user, users, channels, messages, resume_point, heartbeat }) {
+ reboot({ user, resume_point, heartbeat, events }) {
this.remote = r.State.boot({
currentUser: user,
- users,
- channels,
- messages,
resumePoint: resume_point,
heartbeat,
+ events,
});
}
diff --git a/ui/lib/state/remote/channels.svelte.js b/ui/lib/state/remote/channels.svelte.js
index b2888cb..1e40075 100644
--- a/ui/lib/state/remote/channels.svelte.js
+++ b/ui/lib/state/remote/channels.svelte.js
@@ -19,15 +19,6 @@ class Channel {
export class Channels {
all = $state([]);
- static boot(channels) {
- const all = channels.map((channel) => Channel.boot(channel));
- return new Channels({ all });
- }
-
- constructor({ all }) {
- this.all = all;
- }
-
add({ at, id, name }) {
this.all.push(Channel.boot({ at, id, name }));
}
diff --git a/ui/lib/state/remote/messages.svelte.js b/ui/lib/state/remote/messages.svelte.js
index 7ce28b4..1be001b 100644
--- a/ui/lib/state/remote/messages.svelte.js
+++ b/ui/lib/state/remote/messages.svelte.js
@@ -26,15 +26,6 @@ class Message {
export class Messages {
all = $state([]);
- static boot(messages) {
- const all = messages.map(Message.boot);
- return new Messages({ all });
- }
-
- constructor({ all }) {
- this.all = all;
- }
-
add({ id, at, channel, sender, body }) {
const message = Message.boot({ id, at, channel, sender, body });
this.all.push(message);
diff --git a/ui/lib/state/remote/state.svelte.js b/ui/lib/state/remote/state.svelte.js
index a30e0fe..fb46489 100644
--- a/ui/lib/state/remote/state.svelte.js
+++ b/ui/lib/state/remote/state.svelte.js
@@ -4,27 +4,25 @@ import { Messages } from './messages.svelte.js';
export class State {
currentUser = $state();
- users = $state();
- channels = $state();
- messages = $state();
+ users = $state(new Users());
+ channels = $state(new Channels());
+ messages = $state(new Messages());
- static boot({ currentUser, heartbeat, users, channels, messages, resumePoint }) {
- return new State({
+ static boot({ currentUser, heartbeat, resumePoint, events }) {
+ const state = new State({
currentUser: User.boot(currentUser),
heartbeat,
- users: Users.boot(users),
- channels: Channels.boot(channels),
- messages: Messages.boot(messages),
resumePoint,
});
+ for (const event of events) {
+ state.onEvent(event);
+ }
+ return state;
}
- constructor({ currentUser, heartbeat, users, channels, messages, resumePoint }) {
+ constructor({ currentUser, heartbeat, resumePoint }) {
this.currentUser = currentUser;
this.heartbeat = heartbeat;
- this.users = users;
- this.channels = channels;
- this.messages = messages;
this.resumePoint = resumePoint;
}
diff --git a/ui/lib/state/remote/users.svelte.js b/ui/lib/state/remote/users.svelte.js
index a15d1da..391ab3a 100644
--- a/ui/lib/state/remote/users.svelte.js
+++ b/ui/lib/state/remote/users.svelte.js
@@ -16,16 +16,7 @@ export class User {
}
export class Users {
- all = $state();
-
- static boot(users) {
- const all = new SvelteMap(users.map((user) => [user.id, User.boot(user)]));
- return new Users({ all });
- }
-
- constructor({ all }) {
- this.all = all;
- }
+ all = $state(new SvelteMap());
add({ id, name }) {
this.all.set(id, new User(id, name));