summaryrefslogtreecommitdiff
path: root/src/event/app.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/event/app.rs')
-rw-r--r--src/event/app.rs72
1 files changed, 72 insertions, 0 deletions
diff --git a/src/event/app.rs b/src/event/app.rs
new file mode 100644
index 0000000..d664ec7
--- /dev/null
+++ b/src/event/app.rs
@@ -0,0 +1,72 @@
+use futures::{
+ future,
+ stream::{self, StreamExt as _},
+ Stream,
+};
+use itertools::Itertools as _;
+use sqlx::sqlite::SqlitePool;
+
+use super::{broadcaster::Broadcaster, Event, Sequence, Sequenced};
+use crate::{
+ channel::{self, repo::Provider as _},
+ message::{self, repo::Provider as _},
+};
+
+pub struct Events<'a> {
+ db: &'a SqlitePool,
+ events: &'a Broadcaster,
+}
+
+impl<'a> Events<'a> {
+ pub const fn new(db: &'a SqlitePool, events: &'a Broadcaster) -> Self {
+ Self { db, events }
+ }
+
+ pub async fn subscribe(
+ &self,
+ resume_at: Option<Sequence>,
+ ) -> Result<impl Stream<Item = Event> + std::fmt::Debug, sqlx::Error> {
+ // Subscribe before retrieving, to catch messages broadcast while we're
+ // querying the DB. We'll prune out duplicates later.
+ let live_messages = self.events.subscribe();
+
+ let mut tx = self.db.begin().await?;
+
+ let channels = tx.channels().replay(resume_at).await?;
+ let channel_events = channels
+ .iter()
+ .map(channel::History::events)
+ .kmerge_by(Sequence::merge)
+ .filter(Sequence::after(resume_at))
+ .map(Event::from);
+
+ let messages = tx.messages().replay(resume_at).await?;
+ let message_events = messages
+ .iter()
+ .map(message::History::events)
+ .kmerge_by(Sequence::merge)
+ .filter(Sequence::after(resume_at))
+ .map(Event::from);
+
+ let replay_events = channel_events
+ .merge_by(message_events, Sequence::merge)
+ .collect::<Vec<_>>();
+ let resume_live_at = replay_events.last().map(Sequenced::sequence);
+
+ let replay = stream::iter(replay_events);
+
+ let live_messages = live_messages
+ // Filtering on the broadcast resume point filters out messages
+ // before resume_at, and filters out messages duplicated from
+ // `replay_events`.
+ .flat_map(stream::iter)
+ .filter(Self::resume(resume_live_at));
+
+ Ok(replay.chain(live_messages))
+ }
+
+ fn resume(resume_at: Option<Sequence>) -> impl for<'m> FnMut(&'m Event) -> future::Ready<bool> {
+ let filter = Sequence::after(resume_at);
+ move |event| future::ready(filter(event))
+ }
+}