use futures::{ future, stream::{self, StreamExt as _}, Stream, }; use itertools::Itertools as _; use sqlx::sqlite::SqlitePool; use super::{broadcaster::Broadcaster, Event, ResumePoint, Sequence, Sequenced}; use crate::{ channel::{self, repo::Provider as _}, login::{self, repo::Provider as _}, message::{self, repo::Provider as _}, }; pub struct Events<'a> { db: &'a SqlitePool, events: &'a Broadcaster, } impl<'a> Events<'a> { pub const fn new(db: &'a SqlitePool, events: &'a Broadcaster) -> Self { Self { db, events } } pub async fn subscribe( &self, resume_at: impl Into, ) -> Result + std::fmt::Debug, sqlx::Error> { let resume_at = resume_at.into(); // Subscribe before retrieving, to catch messages broadcast while we're // querying the DB. We'll prune out duplicates later. let live_messages = self.events.subscribe(); let mut tx = self.db.begin().await?; let logins = tx.logins().replay(resume_at).await?; let login_events = logins .iter() .map(login::History::events) .kmerge_by(Sequence::merge) .filter(Sequence::after(resume_at)) .map(Event::from); let channels = tx.channels().replay(resume_at).await?; let channel_events = channels .iter() .map(channel::History::events) .kmerge_by(Sequence::merge) .filter(Sequence::after(resume_at)) .map(Event::from); let messages = tx.messages().replay(resume_at).await?; let message_events = messages .iter() .map(message::History::events) .kmerge_by(Sequence::merge) .filter(Sequence::after(resume_at)) .map(Event::from); let replay_events = login_events .merge_by(channel_events, Sequence::merge) .merge_by(message_events, Sequence::merge) .collect::>(); let resume_live_at = replay_events.last().map(Sequenced::sequence); let replay = stream::iter(replay_events); let live_messages = live_messages // Filtering on the broadcast resume point filters out messages // before resume_at, and filters out messages duplicated from // `replay_events`. .flat_map(stream::iter) .filter(Self::resume(resume_live_at)); Ok(replay.chain(live_messages)) } fn resume(resume_at: ResumePoint) -> impl for<'m> FnMut(&'m Event) -> future::Ready { let filter = Sequence::after(resume_at); move |event| future::ready(filter(event)) } }