diff options
| author | Owen Jacobson <owen@grimoire.ca> | 2025-06-18 23:33:02 -0400 |
|---|---|---|
| committer | Owen Jacobson <owen@grimoire.ca> | 2025-06-20 22:27:35 -0400 |
| commit | 4b522c804db8155f74a734c95ed962d996b2c692 (patch) | |
| tree | f5369ba821ba22cdd307b7a3a411e95ad1a1e896 | |
| parent | 057bbef5f37a4051615ad23661a0b4853b61162e (diff) | |
Include historical events in the boot response.
The returned events are all events up to and including the `resume_point` in the same response. If combined with the events from `/api/events?resume_point=x`, using the same `resume_point`, the client will have a complete event history, less any events from histories that have been purged.
| -rw-r--r-- | docs/api/boot.md | 58 | ||||
| -rw-r--r-- | src/boot/app.rs | 50 | ||||
| -rw-r--r-- | src/boot/handlers/boot/test.rs | 125 | ||||
| -rw-r--r-- | src/boot/mod.rs | 5 |
4 files changed, 221 insertions, 17 deletions
diff --git a/docs/api/boot.md b/docs/api/boot.md index 7e3dda3..1f6e619 100644 --- a/docs/api/boot.md +++ b/docs/api/boot.md @@ -21,8 +21,8 @@ sequenceDiagram Client initialization serves three purposes: - It confirms that the client's [identity token](./authentication.md) is valid, and tells the client what user that token is associated with. -- It provides an initial snapshot of the state of the service. -- It provides a resume point for the [event stream](./events.md), which allows clients to consume events starting from the moment the snapshot was created. +- It provides an initial event collection. +- It provides a resume point for the [event stream](./events.md), which allows clients to consume events starting after the initial event collection. ## `GET /api/boot` @@ -43,6 +43,56 @@ This endpoint will respond with a status of }, "resume_point": 1312, "heartbeat": 30, + "events": [ + { + "type": "user", + "event": "created", + "at": "2025-04-14T23:58:10.421901Z", + "id": "U1234abcd", + "name": "example username" + }, + { + "type": "channel", + "event": "created", + "at": "2025-04-14T23:58:11.421901Z", + "id": "C1234abcd", + "name": "nonsense and such" + }, + { + "type": "message", + "event": "sent", + "at": "2024-09-27T23:19:10.208147Z", + "channel": "C1234abcd", + "sender": "U1234abcd", + "id": "M1312acab", + "body": "beep" + }, + { + "type": "message", + "event": "sent", + "at": "2025-06-19T15:14:40.431627Z", + "channel": "Ccfdryfdb4krpy77", + "sender": "U888j6fyc8ccrnkf", + "id": "Mc6jk823wjc82734", + "body": "test" + }, + { + "type": "channel", + "event": "created", + "at": "2025-06-19T15:14:44.764263Z", + "id": "C2d9y6wckph3n36x", + "name": "noob" + }, + { + "type": "message", + "event": "sent", + "at": "2025-06-19T15:29:47.376455Z", + "channel": "Ccfdryfdb4krpy77", + "sender": "U888j6fyc8ccrnkf", + "id": "M3twnj7rfk2ph744", + "body": "test" + } + ], "users": [ { "id": "U1234abcd", @@ -75,10 +125,14 @@ The response will include the following fields: | `user` | object | The details of the caller's identity. | | `resume_point` | integer | A resume point for [events](./events.md), such that the event stream will begin immediately after the included snapshot. | | `heartbeat` | integer | The [heartbeat timeout](./events.md#heartbeat-events), in seconds, for events. | +| `events` | array of object | The events on the server up to the resume point. | | `users` | array of object | A snapshot of the users present in the service. | | `channels` | array of object | A snapshot of the channels present in the service. | | `messages` | array of object | A snapshot of the messages present in the service. | +Each element of the +`events` object is an event, as described in [Events](./events.md). Events are provided in the same order as they would appear in the event stream response. + The `user` object will include the following fields: | Field | Type | Description | diff --git a/src/boot/app.rs b/src/boot/app.rs index f531afe..690bcf4 100644 --- a/src/boot/app.rs +++ b/src/boot/app.rs @@ -1,10 +1,11 @@ +use itertools::Itertools as _; use sqlx::sqlite::SqlitePool; use super::Snapshot; use crate::{ channel::{self, repo::Provider as _}, - event::repo::Provider as _, - message::repo::Provider as _, + event::{Event, Sequence, repo::Provider as _}, + message::{self, repo::Provider as _}, name, user::{self, repo::Provider as _}, }; @@ -22,32 +23,59 @@ impl<'a> Boot<'a> { let mut tx = self.db.begin().await?; let resume_point = tx.sequence().current().await?; - let users = tx.users().all(resume_point).await?; - let channels = tx.channels().all(resume_point).await?; - let messages = tx.messages().all(resume_point).await?; + let user_histories = tx.users().all(resume_point).await?; + let channel_histories = tx.channels().all(resume_point).await?; + let message_histories = tx.messages().all(resume_point).await?; tx.commit().await?; - let users = users - .into_iter() + let users = user_histories + .iter() .filter_map(|user| user.as_of(resume_point)) .collect(); - let channels = channels - .into_iter() + let channels = channel_histories + .iter() .filter_map(|channel| channel.as_of(resume_point)) .collect(); - let messages = messages - .into_iter() + let messages = message_histories + .iter() .filter_map(|message| message.as_of(resume_point)) .collect(); + let user_events = user_histories + .iter() + .map(user::History::events) + .kmerge_by(Sequence::merge) + .filter(Sequence::up_to(resume_point)) + .map(Event::from); + + let channel_events = channel_histories + .iter() + .map(channel::History::events) + .kmerge_by(Sequence::merge) + .filter(Sequence::up_to(resume_point)) + .map(Event::from); + + let message_events = message_histories + .iter() + .map(message::History::events) + .kmerge_by(Sequence::merge) + .filter(Sequence::up_to(resume_point)) + .map(Event::from); + + let events = user_events + .merge_by(channel_events, Sequence::merge) + .merge_by(message_events, Sequence::merge) + .collect(); + Ok(Snapshot { resume_point, users, channels, messages, + events, }) } } diff --git a/src/boot/handlers/boot/test.rs b/src/boot/handlers/boot/test.rs index 0a7622b..d68618e 100644 --- a/src/boot/handlers/boot/test.rs +++ b/src/boot/handlers/boot/test.rs @@ -1,4 +1,5 @@ use axum::extract::State; +use itertools::Itertools as _; use crate::test::fixtures; @@ -15,7 +16,7 @@ async fn returns_identity() { } #[tokio::test] -async fn includes_logins() { +async fn includes_users() { let app = fixtures::scratch_app().await; let spectator = fixtures::user::create(&app, &fixtures::now()).await; @@ -25,6 +26,16 @@ async fn includes_logins() { .expect("boot always succeeds"); assert!(response.snapshot.users.contains(&spectator)); + + let created = response + .snapshot + .events + .into_iter() + .filter_map(fixtures::event::user) + .filter_map(fixtures::event::user::created) + .exactly_one() + .expect("only one user has been created"); + assert_eq!(spectator, created.user) } #[tokio::test] @@ -38,6 +49,16 @@ async fn includes_channels() { .expect("boot always succeeds"); assert!(response.snapshot.channels.contains(&channel)); + + let created = response + .snapshot + .events + .into_iter() + .filter_map(fixtures::event::channel) + .filter_map(fixtures::event::channel::created) + .exactly_one() + .expect("only one channel has been created"); + assert_eq!(channel, created.channel); } #[tokio::test] @@ -53,6 +74,16 @@ async fn includes_messages() { .expect("boot always succeeds"); assert!(response.snapshot.messages.contains(&message)); + + let sent = response + .snapshot + .events + .into_iter() + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::sent) + .exactly_one() + .expect("only one message has been sent"); + assert_eq!(message, sent.message); } #[tokio::test] @@ -74,6 +105,29 @@ async fn excludes_expired_messages() { .expect("boot always succeeds"); assert!(!response.snapshot.messages.contains(&expired_message)); + + let sent = response + .snapshot + .events + .iter() + .cloned() + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::sent) + .exactly_one() + .expect("only one message has been sent"); + // We don't expect `expired_message` to match the event exactly, as the body will have been + // tombstoned and the message given a `deleted_at` date. + assert_eq!(expired_message.id, sent.message.id); + + let deleted = response + .snapshot + .events + .into_iter() + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::deleted) + .exactly_one() + .expect("only one message has expired"); + assert_eq!(expired_message.id, deleted.id); } #[tokio::test] @@ -94,6 +148,29 @@ async fn excludes_deleted_messages() { .expect("boot always succeeds"); assert!(!response.snapshot.messages.contains(&deleted_message)); + + let sent = response + .snapshot + .events + .iter() + .cloned() + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::sent) + .exactly_one() + .expect("only one message has been sent"); + // We don't expect `deleted_message` to match the event exactly, as the body will have been + // tombstoned and the message given a `deleted_at` date. + assert_eq!(deleted_message.id, sent.message.id); + + let deleted = response + .snapshot + .events + .into_iter() + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::deleted) + .exactly_one() + .expect("only one message has been deleted"); + assert_eq!(deleted_message.id, deleted.id); } #[tokio::test] @@ -112,6 +189,29 @@ async fn excludes_expired_channels() { .expect("boot always succeeds"); assert!(!response.snapshot.channels.contains(&expired_channel)); + + let created = response + .snapshot + .events + .iter() + .cloned() + .filter_map(fixtures::event::channel) + .filter_map(fixtures::event::channel::created) + .exactly_one() + .expect("only one channel has been created"); + // We don't expect `expired_channel` to match the event exactly, as the name will have been + // tombstoned and the channel given a `deleted_at` date. + assert_eq!(expired_channel.id, created.channel.id); + + let deleted = response + .snapshot + .events + .into_iter() + .filter_map(fixtures::event::channel) + .filter_map(fixtures::event::channel::deleted) + .exactly_one() + .expect("only one channel has expired"); + assert_eq!(expired_channel.id, deleted.id); } #[tokio::test] @@ -130,4 +230,27 @@ async fn excludes_deleted_channels() { .expect("boot always succeeds"); assert!(!response.snapshot.channels.contains(&deleted_channel)); + + let created = response + .snapshot + .events + .iter() + .cloned() + .filter_map(fixtures::event::channel) + .filter_map(fixtures::event::channel::created) + .exactly_one() + .expect("only one channel has been created"); + // We don't expect `deleted_channel` to match the event exactly, as the name will have been + // tombstoned and the channel given a `deleted_at` date. + assert_eq!(deleted_channel.id, created.channel.id); + + let deleted = response + .snapshot + .events + .into_iter() + .filter_map(fixtures::event::channel) + .filter_map(fixtures::event::channel::deleted) + .exactly_one() + .expect("only one channel has been deleted"); + assert_eq!(deleted_channel.id, deleted.id); } diff --git a/src/boot/mod.rs b/src/boot/mod.rs index 3fc2c9e..148e87d 100644 --- a/src/boot/mod.rs +++ b/src/boot/mod.rs @@ -1,6 +1,4 @@ -use serde::Serialize; - -use crate::{channel::Channel, event::Sequence, message::Message, user::User}; +use crate::{channel::Channel, event::Event, event::Sequence, message::Message, user::User}; pub mod app; pub mod handlers; @@ -11,4 +9,5 @@ pub struct Snapshot { pub users: Vec<User>, pub channels: Vec<Channel>, pub messages: Vec<Message>, + pub events: Vec<Event>, } |
