summaryrefslogtreecommitdiff
path: root/src/events/app.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/events/app.rs')
-rw-r--r--src/events/app.rs24
1 files changed, 18 insertions, 6 deletions
diff --git a/src/events/app.rs b/src/events/app.rs
index 043a29b..134e86a 100644
--- a/src/events/app.rs
+++ b/src/events/app.rs
@@ -11,7 +11,7 @@ use sqlx::sqlite::SqlitePool;
use super::{
broadcaster::Broadcaster,
repo::message::Provider as _,
- types::{self, ResumePoint},
+ types::{self, ChannelEvent, ResumePoint},
};
use crate::{
clock::DateTime,
@@ -66,6 +66,17 @@ impl<'a> Events<'a> {
let mut tx = self.db.begin().await?;
let channels = tx.channels().all().await?;
+ let created_events = {
+ let resume_at = resume_at.clone();
+ let channels = channels.clone();
+ stream::iter(
+ channels
+ .into_iter()
+ .map(ChannelEvent::created)
+ .filter(move |event| resume_at.not_after(event)),
+ )
+ };
+
// Subscribe before retrieving, to catch messages broadcast while we're
// querying the DB. We'll prune out duplicates later.
let live_messages = self.broadcaster.subscribe();
@@ -104,9 +115,9 @@ impl<'a> Events<'a> {
// stored_messages.
.filter(Self::resume(resume_live_at));
- Ok(replay
- .chain(live_messages)
- .scan(resume_at, |resume_point, event| {
+ Ok(created_events.chain(replay).chain(live_messages).scan(
+ resume_at,
+ |resume_point, event| {
let channel = &event.channel.id;
let sequence = event.sequence;
resume_point.advance(channel, sequence);
@@ -114,13 +125,14 @@ impl<'a> Events<'a> {
let event = types::ResumableEvent(resume_point.clone(), event);
future::ready(Some(event))
- }))
+ },
+ ))
}
fn resume(
resume_at: ResumePoint,
) -> impl for<'m> FnMut(&'m types::ChannelEvent) -> future::Ready<bool> {
- move |event| future::ready(resume_at < event.sequence())
+ move |event| future::ready(resume_at.not_after(event))
}
fn skip_expired(
expire_at: &DateTime,