summaryrefslogtreecommitdiff
path: root/src/event/app.rs
diff options
context:
space:
mode:
authorOwen Jacobson <owen@grimoire.ca>2024-10-30 02:01:31 -0400
committerOwen Jacobson <owen@grimoire.ca>2024-10-30 02:01:31 -0400
commit50a382528288248381b07c25719cbc9a519b4c81 (patch)
tree01fc7a2997c3678aa687a75e2e7d56ef0876b450 /src/event/app.rs
parent70591c5ac10069a4ae649bd6f79d769da9e32a98 (diff)
Resume points are no longer optional.
This is an inconsequential change for actual clients, since "resume from the beginning" was never a preferred mode of operation, and it simplifies some internals. It should also mean we get better query plans where `coalesce(cond, true)` was previously being used.
Diffstat (limited to 'src/event/app.rs')
-rw-r--r--src/event/app.rs9
1 files changed, 4 insertions, 5 deletions
diff --git a/src/event/app.rs b/src/event/app.rs
index c754388..b309245 100644
--- a/src/event/app.rs
+++ b/src/event/app.rs
@@ -6,7 +6,7 @@ use futures::{
use itertools::Itertools as _;
use sqlx::sqlite::SqlitePool;
-use super::{broadcaster::Broadcaster, Event, ResumePoint, Sequence, Sequenced};
+use super::{broadcaster::Broadcaster, Event, Sequence, Sequenced};
use crate::{
channel::{self, repo::Provider as _},
login::{self, repo::Provider as _},
@@ -26,9 +26,8 @@ impl<'a> Events<'a> {
pub async fn subscribe(
&self,
- resume_at: impl Into<ResumePoint>,
+ resume_at: Sequence,
) -> Result<impl Stream<Item = Event> + std::fmt::Debug, Error> {
- let resume_at = resume_at.into();
// Subscribe before retrieving, to catch messages broadcast while we're
// querying the DB. We'll prune out duplicates later.
let live_messages = self.events.subscribe();
@@ -63,7 +62,7 @@ impl<'a> Events<'a> {
.merge_by(channel_events, Sequence::merge)
.merge_by(message_events, Sequence::merge)
.collect::<Vec<_>>();
- let resume_live_at = replay_events.last().map(Sequenced::sequence);
+ let resume_live_at = replay_events.last().map_or(resume_at, Sequenced::sequence);
let replay = stream::iter(replay_events);
@@ -77,7 +76,7 @@ impl<'a> Events<'a> {
Ok(replay.chain(live_messages))
}
- fn resume(resume_at: ResumePoint) -> impl for<'m> FnMut(&'m Event) -> future::Ready<bool> {
+ fn resume(resume_at: Sequence) -> impl for<'m> FnMut(&'m Event) -> future::Ready<bool> {
let filter = Sequence::after(resume_at);
move |event| future::ready(filter(event))
}