diff options
Diffstat (limited to 'src/boot')
| -rw-r--r-- | src/boot/app.rs | 44 | ||||
| -rw-r--r-- | src/boot/handlers/boot/mod.rs | 20 | ||||
| -rw-r--r-- | src/boot/handlers/boot/test.rs | 133 | ||||
| -rw-r--r-- | src/boot/mod.rs | 19 |
4 files changed, 168 insertions, 48 deletions
diff --git a/src/boot/app.rs b/src/boot/app.rs index cd45c38..89eec12 100644 --- a/src/boot/app.rs +++ b/src/boot/app.rs @@ -1,10 +1,11 @@ +use itertools::Itertools as _; use sqlx::sqlite::SqlitePool; use super::Snapshot; use crate::{ channel::{self, repo::Provider as _}, - event::{Heartbeat, repo::Provider as _}, - message::repo::Provider as _, + event::{Event, Sequence, repo::Provider as _}, + message::{self, repo::Provider as _}, name, user::{self, repo::Provider as _}, }; @@ -21,7 +22,6 @@ impl<'a> Boot<'a> { pub async fn snapshot(&self) -> Result<Snapshot, Error> { let mut tx = self.db.begin().await?; let resume_point = tx.sequence().current().await?; - let heartbeat = Heartbeat::TIMEOUT; let users = tx.users().all(resume_point).await?; let channels = tx.channels().all(resume_point).await?; @@ -29,27 +29,35 @@ impl<'a> Boot<'a> { tx.commit().await?; - let users = users - .into_iter() - .filter_map(|user| user.as_of(resume_point)) - .collect(); + let user_events = users + .iter() + .map(user::History::events) + .kmerge_by(Sequence::merge) + .filter(Sequence::up_to(resume_point)) + .map(Event::from); - let channels = channels - .into_iter() - .filter_map(|channel| channel.as_of(resume_point)) - .collect(); + let channel_events = channels + .iter() + .map(channel::History::events) + .kmerge_by(Sequence::merge) + .filter(Sequence::up_to(resume_point)) + .map(Event::from); + + let message_events = messages + .iter() + .map(message::History::events) + .kmerge_by(Sequence::merge) + .filter(Sequence::up_to(resume_point)) + .map(Event::from); - let messages = messages - .into_iter() - .filter_map(|message| message.as_of(resume_point)) + let events = user_events + .merge_by(channel_events, Sequence::merge) + .merge_by(message_events, Sequence::merge) .collect(); Ok(Snapshot { resume_point, - heartbeat, - users, - channels, - messages, + events, }) } } diff --git a/src/boot/handlers/boot/mod.rs b/src/boot/handlers/boot/mod.rs index 010f57b..49691f7 100644 --- a/src/boot/handlers/boot/mod.rs +++ b/src/boot/handlers/boot/mod.rs @@ -1,17 +1,26 @@ +use std::time::Duration; + use axum::{ extract::{Json, State}, response::{self, IntoResponse}, }; +use serde::Serialize; -use crate::{app::App, boot::Snapshot, error::Internal, token::extract::Identity, user::User}; +use crate::{ + app::App, boot::Snapshot, error::Internal, event::Heartbeat, token::extract::Identity, + user::User, +}; #[cfg(test)] mod test; pub async fn handler(State(app): State<App>, identity: Identity) -> Result<Response, Internal> { let snapshot = app.boot().snapshot().await?; + let heartbeat = Heartbeat::TIMEOUT; + Ok(Response { user: identity.user, + heartbeat, snapshot, }) } @@ -19,6 +28,8 @@ pub async fn handler(State(app): State<App>, identity: Identity) -> Result<Respo #[derive(serde::Serialize)] pub struct Response { pub user: User, + #[serde(serialize_with = "as_seconds")] + pub heartbeat: Duration, #[serde(flatten)] pub snapshot: Snapshot, } @@ -28,3 +39,10 @@ impl IntoResponse for Response { Json(self).into_response() } } + +fn as_seconds<S>(duration: &Duration, serializer: S) -> Result<S::Ok, S::Error> +where + S: serde::Serializer, +{ + duration.as_secs().serialize(serializer) +} diff --git a/src/boot/handlers/boot/test.rs b/src/boot/handlers/boot/test.rs index 0a7622b..1e590a7 100644 --- a/src/boot/handlers/boot/test.rs +++ b/src/boot/handlers/boot/test.rs @@ -1,4 +1,5 @@ use axum::extract::State; +use itertools::Itertools as _; use crate::test::fixtures; @@ -15,7 +16,7 @@ async fn returns_identity() { } #[tokio::test] -async fn includes_logins() { +async fn includes_users() { let app = fixtures::scratch_app().await; let spectator = fixtures::user::create(&app, &fixtures::now()).await; @@ -24,7 +25,15 @@ async fn includes_logins() { .await .expect("boot always succeeds"); - assert!(response.snapshot.users.contains(&spectator)); + let created = response + .snapshot + .events + .into_iter() + .filter_map(fixtures::event::user) + .filter_map(fixtures::event::user::created) + .exactly_one() + .expect("only one user has been created"); + assert_eq!(spectator, created.user) } #[tokio::test] @@ -37,7 +46,15 @@ async fn includes_channels() { .await .expect("boot always succeeds"); - assert!(response.snapshot.channels.contains(&channel)); + let created = response + .snapshot + .events + .into_iter() + .filter_map(fixtures::event::channel) + .filter_map(fixtures::event::channel::created) + .exactly_one() + .expect("only one channel has been created"); + assert_eq!(channel, created.channel); } #[tokio::test] @@ -52,11 +69,19 @@ async fn includes_messages() { .await .expect("boot always succeeds"); - assert!(response.snapshot.messages.contains(&message)); + let sent = response + .snapshot + .events + .into_iter() + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::sent) + .exactly_one() + .expect("only one message has been sent"); + assert_eq!(message, sent.message); } #[tokio::test] -async fn excludes_expired_messages() { +async fn includes_expired_messages() { let app = fixtures::scratch_app().await; let sender = fixtures::user::create(&app, &fixtures::ancient()).await; let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; @@ -73,11 +98,32 @@ async fn excludes_expired_messages() { .await .expect("boot always succeeds"); - assert!(!response.snapshot.messages.contains(&expired_message)); + let sent = response + .snapshot + .events + .iter() + .cloned() + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::sent) + .exactly_one() + .expect("only one message has been sent"); + // We don't expect `expired_message` to match the event exactly, as the body will have been + // tombstoned and the message given a `deleted_at` date. + assert_eq!(expired_message.id, sent.message.id); + + let deleted = response + .snapshot + .events + .into_iter() + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::deleted) + .exactly_one() + .expect("only one message has expired"); + assert_eq!(expired_message.id, deleted.id); } #[tokio::test] -async fn excludes_deleted_messages() { +async fn includes_deleted_messages() { let app = fixtures::scratch_app().await; let sender = fixtures::user::create(&app, &fixtures::now()).await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; @@ -93,11 +139,32 @@ async fn excludes_deleted_messages() { .await .expect("boot always succeeds"); - assert!(!response.snapshot.messages.contains(&deleted_message)); + let sent = response + .snapshot + .events + .iter() + .cloned() + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::sent) + .exactly_one() + .expect("only one message has been sent"); + // We don't expect `deleted_message` to match the event exactly, as the body will have been + // tombstoned and the message given a `deleted_at` date. + assert_eq!(deleted_message.id, sent.message.id); + + let deleted = response + .snapshot + .events + .into_iter() + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::deleted) + .exactly_one() + .expect("only one message has been deleted"); + assert_eq!(deleted_message.id, deleted.id); } #[tokio::test] -async fn excludes_expired_channels() { +async fn includes_expired_channels() { let app = fixtures::scratch_app().await; let expired_channel = fixtures::channel::create(&app, &fixtures::ancient()).await; @@ -111,11 +178,32 @@ async fn excludes_expired_channels() { .await .expect("boot always succeeds"); - assert!(!response.snapshot.channels.contains(&expired_channel)); + let created = response + .snapshot + .events + .iter() + .cloned() + .filter_map(fixtures::event::channel) + .filter_map(fixtures::event::channel::created) + .exactly_one() + .expect("only one channel has been created"); + // We don't expect `expired_channel` to match the event exactly, as the name will have been + // tombstoned and the channel given a `deleted_at` date. + assert_eq!(expired_channel.id, created.channel.id); + + let deleted = response + .snapshot + .events + .into_iter() + .filter_map(fixtures::event::channel) + .filter_map(fixtures::event::channel::deleted) + .exactly_one() + .expect("only one channel has expired"); + assert_eq!(expired_channel.id, deleted.id); } #[tokio::test] -async fn excludes_deleted_channels() { +async fn includes_deleted_channels() { let app = fixtures::scratch_app().await; let deleted_channel = fixtures::channel::create(&app, &fixtures::now()).await; @@ -129,5 +217,26 @@ async fn excludes_deleted_channels() { .await .expect("boot always succeeds"); - assert!(!response.snapshot.channels.contains(&deleted_channel)); + let created = response + .snapshot + .events + .iter() + .cloned() + .filter_map(fixtures::event::channel) + .filter_map(fixtures::event::channel::created) + .exactly_one() + .expect("only one channel has been created"); + // We don't expect `deleted_channel` to match the event exactly, as the name will have been + // tombstoned and the channel given a `deleted_at` date. + assert_eq!(deleted_channel.id, created.channel.id); + + let deleted = response + .snapshot + .events + .into_iter() + .filter_map(fixtures::event::channel) + .filter_map(fixtures::event::channel::deleted) + .exactly_one() + .expect("only one channel has been deleted"); + assert_eq!(deleted_channel.id, deleted.id); } diff --git a/src/boot/mod.rs b/src/boot/mod.rs index 48da4f0..e0d35d9 100644 --- a/src/boot/mod.rs +++ b/src/boot/mod.rs @@ -1,8 +1,4 @@ -use std::time::Duration; - -use serde::Serialize; - -use crate::{channel::Channel, event::Sequence, message::Message, user::User}; +use crate::{event::Event, event::Sequence}; pub mod app; pub mod handlers; @@ -10,16 +6,5 @@ pub mod handlers; #[derive(serde::Serialize)] pub struct Snapshot { pub resume_point: Sequence, - #[serde(serialize_with = "as_seconds")] - pub heartbeat: Duration, - pub users: Vec<User>, - pub channels: Vec<Channel>, - pub messages: Vec<Message>, -} - -fn as_seconds<S>(duration: &Duration, serializer: S) -> Result<S::Ok, S::Error> -where - S: serde::Serializer, -{ - duration.as_secs().serialize(serializer) + pub events: Vec<Event>, } |
