use futures::{ Stream, future, stream::{self, StreamExt as _}, }; use itertools::Itertools as _; use sqlx::sqlite::SqlitePool; use super::{Event, Sequence, Sequenced, broadcaster::Broadcaster}; use crate::{ conversation::{self, repo::Provider as _}, db::NotFound, message::{self, repo::Provider as _}, name, user::{self, repo::Provider as _}, vapid, vapid::repo::Provider as _, }; pub struct Events { db: SqlitePool, events: Broadcaster, } impl Events { pub const fn new(db: SqlitePool, events: Broadcaster) -> Self { Self { db, events } } pub async fn subscribe( &self, resume_at: Sequence, ) -> Result + std::fmt::Debug + use<>, Error> { // Subscribe before retrieving, to catch messages broadcast while we're // querying the DB. We'll prune out duplicates later. let live_messages = self.events.subscribe(); let mut tx = self.db.begin().await?; let users = tx.users().replay(resume_at).await?; let user_events = users .iter() .map(user::History::events) .kmerge_by(Sequence::merge) .filter(Sequence::after(resume_at)) .map(Event::from); let conversations = tx.conversations().replay(resume_at).await?; let conversation_events = conversations .iter() .map(conversation::History::events) .kmerge_by(Sequence::merge) .filter(Sequence::after(resume_at)) .map(Event::from); let messages = tx.messages().replay(resume_at).await?; let message_events = messages .iter() .map(message::History::events) .kmerge_by(Sequence::merge) .filter(Sequence::after(resume_at)) .map(Event::from); let vapid = tx.vapid().current().await.optional()?; let vapid_events = vapid .iter() .flat_map(vapid::History::events) .filter(Sequence::after(resume_at)) .map(Event::from); let replay_events = user_events .merge_by(conversation_events, Sequence::merge) .merge_by(message_events, Sequence::merge) .merge_by(vapid_events, Sequence::merge) .collect::>(); let resume_live_at = replay_events.last().map_or(resume_at, Sequenced::sequence); let replay = stream::iter(replay_events); let live_messages = live_messages // Filtering on the broadcast resume point filters out messages // before resume_at, and filters out messages duplicated from // `replay_events`. .flat_map(stream::iter) .filter(Self::resume(resume_live_at)); Ok(replay.chain(live_messages)) } fn resume(resume_at: Sequence) -> impl for<'m> FnMut(&'m Event) -> future::Ready + use<> { let filter = Sequence::after(resume_at); move |event| future::ready(filter(event)) } } #[derive(Debug, thiserror::Error)] #[error(transparent)] pub enum Error { Database(#[from] sqlx::Error), Name(#[from] name::Error), Ecdsa(#[from] p256::ecdsa::Error), } impl From for Error { fn from(error: user::repo::LoadError) -> Self { use user::repo::LoadError; match error { LoadError::Database(error) => error.into(), LoadError::Name(error) => error.into(), } } } impl From for Error { fn from(error: conversation::repo::LoadError) -> Self { use conversation::repo::LoadError; match error { LoadError::Database(error) => error.into(), LoadError::Name(error) => error.into(), } } } impl From for Error { fn from(error: vapid::repo::Error) -> Self { use vapid::repo::Error; match error { Error::Database(error) => error.into(), Error::Ecdsa(error) => error.into(), } } }