diff options
Diffstat (limited to 'src/event/app.rs')
| -rw-r--r-- | src/event/app.rs | 72 |
1 files changed, 72 insertions, 0 deletions
diff --git a/src/event/app.rs b/src/event/app.rs new file mode 100644 index 0000000..d664ec7 --- /dev/null +++ b/src/event/app.rs @@ -0,0 +1,72 @@ +use futures::{ + future, + stream::{self, StreamExt as _}, + Stream, +}; +use itertools::Itertools as _; +use sqlx::sqlite::SqlitePool; + +use super::{broadcaster::Broadcaster, Event, Sequence, Sequenced}; +use crate::{ + channel::{self, repo::Provider as _}, + message::{self, repo::Provider as _}, +}; + +pub struct Events<'a> { + db: &'a SqlitePool, + events: &'a Broadcaster, +} + +impl<'a> Events<'a> { + pub const fn new(db: &'a SqlitePool, events: &'a Broadcaster) -> Self { + Self { db, events } + } + + pub async fn subscribe( + &self, + resume_at: Option<Sequence>, + ) -> Result<impl Stream<Item = Event> + std::fmt::Debug, sqlx::Error> { + // Subscribe before retrieving, to catch messages broadcast while we're + // querying the DB. We'll prune out duplicates later. + let live_messages = self.events.subscribe(); + + let mut tx = self.db.begin().await?; + + let channels = tx.channels().replay(resume_at).await?; + let channel_events = channels + .iter() + .map(channel::History::events) + .kmerge_by(Sequence::merge) + .filter(Sequence::after(resume_at)) + .map(Event::from); + + let messages = tx.messages().replay(resume_at).await?; + let message_events = messages + .iter() + .map(message::History::events) + .kmerge_by(Sequence::merge) + .filter(Sequence::after(resume_at)) + .map(Event::from); + + let replay_events = channel_events + .merge_by(message_events, Sequence::merge) + .collect::<Vec<_>>(); + let resume_live_at = replay_events.last().map(Sequenced::sequence); + + let replay = stream::iter(replay_events); + + let live_messages = live_messages + // Filtering on the broadcast resume point filters out messages + // before resume_at, and filters out messages duplicated from + // `replay_events`. + .flat_map(stream::iter) + .filter(Self::resume(resume_live_at)); + + Ok(replay.chain(live_messages)) + } + + fn resume(resume_at: Option<Sequence>) -> impl for<'m> FnMut(&'m Event) -> future::Ready<bool> { + let filter = Sequence::after(resume_at); + move |event| future::ready(filter(event)) + } +} |
