summaryrefslogtreecommitdiff
path: root/src/event
diff options
context:
space:
mode:
Diffstat (limited to 'src/event')
-rw-r--r--src/event/app.rs7
-rw-r--r--src/event/mod.rs2
-rw-r--r--src/event/routes.rs4
-rw-r--r--src/event/sequence.rs5
4 files changed, 11 insertions, 7 deletions
diff --git a/src/event/app.rs b/src/event/app.rs
index d664ec7..141037d 100644
--- a/src/event/app.rs
+++ b/src/event/app.rs
@@ -6,7 +6,7 @@ use futures::{
use itertools::Itertools as _;
use sqlx::sqlite::SqlitePool;
-use super::{broadcaster::Broadcaster, Event, Sequence, Sequenced};
+use super::{broadcaster::Broadcaster, Event, ResumePoint, Sequence, Sequenced};
use crate::{
channel::{self, repo::Provider as _},
message::{self, repo::Provider as _},
@@ -24,8 +24,9 @@ impl<'a> Events<'a> {
pub async fn subscribe(
&self,
- resume_at: Option<Sequence>,
+ resume_at: impl Into<ResumePoint>,
) -> Result<impl Stream<Item = Event> + std::fmt::Debug, sqlx::Error> {
+ let resume_at = resume_at.into();
// Subscribe before retrieving, to catch messages broadcast while we're
// querying the DB. We'll prune out duplicates later.
let live_messages = self.events.subscribe();
@@ -65,7 +66,7 @@ impl<'a> Events<'a> {
Ok(replay.chain(live_messages))
}
- fn resume(resume_at: Option<Sequence>) -> impl for<'m> FnMut(&'m Event) -> future::Ready<bool> {
+ fn resume(resume_at: ResumePoint) -> impl for<'m> FnMut(&'m Event) -> future::Ready<bool> {
let filter = Sequence::after(resume_at);
move |event| future::ready(filter(event))
}
diff --git a/src/event/mod.rs b/src/event/mod.rs
index 1349fe6..e748d66 100644
--- a/src/event/mod.rs
+++ b/src/event/mod.rs
@@ -12,6 +12,8 @@ pub use self::{
sequence::{Instant, Sequence, Sequenced},
};
+pub type ResumePoint = Option<Sequence>;
+
#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
pub struct Event {
#[serde(flatten)]
diff --git a/src/event/routes.rs b/src/event/routes.rs
index 5b9c7e3..de6d248 100644
--- a/src/event/routes.rs
+++ b/src/event/routes.rs
@@ -14,7 +14,7 @@ use super::{extract::LastEventId, Event};
use crate::{
app::App,
error::{Internal, Unauthorized},
- event::{Sequence, Sequenced as _},
+ event::{ResumePoint, Sequence, Sequenced as _},
token::{app::ValidateError, extract::Identity},
};
@@ -27,7 +27,7 @@ pub fn router() -> Router<App> {
#[derive(Default, serde::Deserialize)]
struct EventsQuery {
- resume_point: Option<Sequence>,
+ resume_point: ResumePoint,
}
async fn events(
diff --git a/src/event/sequence.rs b/src/event/sequence.rs
index fbe3711..ceb5bcb 100644
--- a/src/event/sequence.rs
+++ b/src/event/sequence.rs
@@ -1,5 +1,6 @@
use std::fmt;
+use super::ResumePoint;
use crate::clock::DateTime;
#[derive(Clone, Copy, Debug, Eq, PartialEq, serde::Serialize)]
@@ -39,14 +40,14 @@ impl fmt::Display for Sequence {
}
impl Sequence {
- pub fn up_to<E>(resume_point: Option<Self>) -> impl for<'e> Fn(&'e E) -> bool
+ pub fn up_to<E>(resume_point: ResumePoint) -> impl for<'e> Fn(&'e E) -> bool
where
E: Sequenced,
{
move |event| resume_point.map_or(true, |resume_point| event.sequence() <= resume_point)
}
- pub fn after<E>(resume_point: Option<Self>) -> impl for<'e> Fn(&'e E) -> bool
+ pub fn after<E>(resume_point: ResumePoint) -> impl for<'e> Fn(&'e E) -> bool
where
E: Sequenced,
{