summaryrefslogtreecommitdiff
path: root/src/event/app.rs
diff options
context:
space:
mode:
authorOwen Jacobson <owen@grimoire.ca>2024-10-05 23:00:58 -0400
committerOwen Jacobson <owen@grimoire.ca>2024-10-05 23:00:58 -0400
commit05de3c7b211727039b3912311aa4bab6787a7457 (patch)
tree08a3860b68391514390f42872ccc1cb4c6e6afd2 /src/event/app.rs
parentbc514e0ea5f0a553f15ab8275961907877181520 (diff)
parent6a10fcaf64938da52b326ea80013d9f30ed62a6c (diff)
Merge branch 'wip/boot'
Diffstat (limited to 'src/event/app.rs')
-rw-r--r--src/event/app.rs7
1 files changed, 4 insertions, 3 deletions
diff --git a/src/event/app.rs b/src/event/app.rs
index d664ec7..141037d 100644
--- a/src/event/app.rs
+++ b/src/event/app.rs
@@ -6,7 +6,7 @@ use futures::{
use itertools::Itertools as _;
use sqlx::sqlite::SqlitePool;
-use super::{broadcaster::Broadcaster, Event, Sequence, Sequenced};
+use super::{broadcaster::Broadcaster, Event, ResumePoint, Sequence, Sequenced};
use crate::{
channel::{self, repo::Provider as _},
message::{self, repo::Provider as _},
@@ -24,8 +24,9 @@ impl<'a> Events<'a> {
pub async fn subscribe(
&self,
- resume_at: Option<Sequence>,
+ resume_at: impl Into<ResumePoint>,
) -> Result<impl Stream<Item = Event> + std::fmt::Debug, sqlx::Error> {
+ let resume_at = resume_at.into();
// Subscribe before retrieving, to catch messages broadcast while we're
// querying the DB. We'll prune out duplicates later.
let live_messages = self.events.subscribe();
@@ -65,7 +66,7 @@ impl<'a> Events<'a> {
Ok(replay.chain(live_messages))
}
- fn resume(resume_at: Option<Sequence>) -> impl for<'m> FnMut(&'m Event) -> future::Ready<bool> {
+ fn resume(resume_at: ResumePoint) -> impl for<'m> FnMut(&'m Event) -> future::Ready<bool> {
let filter = Sequence::after(resume_at);
move |event| future::ready(filter(event))
}