use itertools::Itertools as _; use sqlx::sqlite::SqlitePool; use super::Snapshot; use crate::{ conversation::{self, repo::Provider as _}, db::{self, NotFound as _}, error::failed::{Failed, ResultExt as _}, event::{Event, Sequence, repo::Provider as _}, message::{self, repo::Provider as _}, user::{self, repo::Provider as _}, vapid::{self, repo::Provider as _}, }; pub struct Boot { db: SqlitePool, } impl Boot { pub const fn new(db: SqlitePool) -> Self { Self { db } } pub async fn snapshot(&self) -> Result { let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?; let resume_point = tx .sequence() .current() .await .fail("Failed to load resume point")?; let users = tx .users() .all(resume_point) .await .fail("Failed to load user events")?; let conversations = tx .conversations() .all(resume_point) .await .fail("Failed to load conversation events")?; let messages = tx .messages() .all(resume_point) .await .fail("Failed to load message events")?; let vapid = tx .vapid() .current() .await .optional() .fail("Failed to load VAPID key events")?; tx.commit().await.fail(db::failed::COMMIT)?; let user_events = users .iter() .map(user::History::events) .kmerge_by(Sequence::merge) .filter(Sequence::up_to(resume_point)) .map(Event::from); let conversation_events = conversations .iter() .map(conversation::History::events) .kmerge_by(Sequence::merge) .filter(Sequence::up_to(resume_point)) .map(Event::from); let message_events = messages .iter() .map(message::History::events) .kmerge_by(Sequence::merge) .filter(Sequence::up_to(resume_point)) .map(Event::from); let vapid_events = vapid .iter() .flat_map(vapid::History::events) .filter(Sequence::up_to(resume_point)) .map(Event::from); let events = user_events .merge_by(conversation_events, Sequence::merge) .merge_by(message_events, Sequence::merge) .merge_by(vapid_events, Sequence::merge) .collect(); Ok(Snapshot { resume_point, events, }) } }