diff options
Diffstat (limited to 'src/event')
| -rw-r--r-- | src/event/app.rs | 7 | ||||
| -rw-r--r-- | src/event/mod.rs | 2 | ||||
| -rw-r--r-- | src/event/routes.rs | 4 | ||||
| -rw-r--r-- | src/event/sequence.rs | 5 |
4 files changed, 11 insertions, 7 deletions
diff --git a/src/event/app.rs b/src/event/app.rs index d664ec7..141037d 100644 --- a/src/event/app.rs +++ b/src/event/app.rs @@ -6,7 +6,7 @@ use futures::{ use itertools::Itertools as _; use sqlx::sqlite::SqlitePool; -use super::{broadcaster::Broadcaster, Event, Sequence, Sequenced}; +use super::{broadcaster::Broadcaster, Event, ResumePoint, Sequence, Sequenced}; use crate::{ channel::{self, repo::Provider as _}, message::{self, repo::Provider as _}, @@ -24,8 +24,9 @@ impl<'a> Events<'a> { pub async fn subscribe( &self, - resume_at: Option<Sequence>, + resume_at: impl Into<ResumePoint>, ) -> Result<impl Stream<Item = Event> + std::fmt::Debug, sqlx::Error> { + let resume_at = resume_at.into(); // Subscribe before retrieving, to catch messages broadcast while we're // querying the DB. We'll prune out duplicates later. let live_messages = self.events.subscribe(); @@ -65,7 +66,7 @@ impl<'a> Events<'a> { Ok(replay.chain(live_messages)) } - fn resume(resume_at: Option<Sequence>) -> impl for<'m> FnMut(&'m Event) -> future::Ready<bool> { + fn resume(resume_at: ResumePoint) -> impl for<'m> FnMut(&'m Event) -> future::Ready<bool> { let filter = Sequence::after(resume_at); move |event| future::ready(filter(event)) } diff --git a/src/event/mod.rs b/src/event/mod.rs index 1349fe6..e748d66 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -12,6 +12,8 @@ pub use self::{ sequence::{Instant, Sequence, Sequenced}, }; +pub type ResumePoint = Option<Sequence>; + #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] pub struct Event { #[serde(flatten)] diff --git a/src/event/routes.rs b/src/event/routes.rs index 5b9c7e3..de6d248 100644 --- a/src/event/routes.rs +++ b/src/event/routes.rs @@ -14,7 +14,7 @@ use super::{extract::LastEventId, Event}; use crate::{ app::App, error::{Internal, Unauthorized}, - event::{Sequence, Sequenced as _}, + event::{ResumePoint, Sequence, Sequenced as _}, token::{app::ValidateError, extract::Identity}, }; @@ -27,7 +27,7 @@ pub fn router() -> Router<App> { #[derive(Default, serde::Deserialize)] struct EventsQuery { - resume_point: Option<Sequence>, + resume_point: ResumePoint, } async fn events( diff --git a/src/event/sequence.rs b/src/event/sequence.rs index fbe3711..ceb5bcb 100644 --- a/src/event/sequence.rs +++ b/src/event/sequence.rs @@ -1,5 +1,6 @@ use std::fmt; +use super::ResumePoint; use crate::clock::DateTime; #[derive(Clone, Copy, Debug, Eq, PartialEq, serde::Serialize)] @@ -39,14 +40,14 @@ impl fmt::Display for Sequence { } impl Sequence { - pub fn up_to<E>(resume_point: Option<Self>) -> impl for<'e> Fn(&'e E) -> bool + pub fn up_to<E>(resume_point: ResumePoint) -> impl for<'e> Fn(&'e E) -> bool where E: Sequenced, { move |event| resume_point.map_or(true, |resume_point| event.sequence() <= resume_point) } - pub fn after<E>(resume_point: Option<Self>) -> impl for<'e> Fn(&'e E) -> bool + pub fn after<E>(resume_point: ResumePoint) -> impl for<'e> Fn(&'e E) -> bool where E: Sequenced, { |
