summaryrefslogtreecommitdiff
path: root/src/event
diff options
context:
space:
mode:
authorOwen Jacobson <owen@grimoire.ca>2024-10-30 02:01:31 -0400
committerOwen Jacobson <owen@grimoire.ca>2024-10-30 02:01:31 -0400
commit50a382528288248381b07c25719cbc9a519b4c81 (patch)
tree01fc7a2997c3678aa687a75e2e7d56ef0876b450 /src/event
parent70591c5ac10069a4ae649bd6f79d769da9e32a98 (diff)
Resume points are no longer optional.
This is an inconsequential change for actual clients, since "resume from the beginning" was never a preferred mode of operation, and it simplifies some internals. It should also mean we get better query plans where `coalesce(cond, true)` was previously being used.
Diffstat (limited to 'src/event')
-rw-r--r--src/event/app.rs9
-rw-r--r--src/event/mod.rs2
-rw-r--r--src/event/routes/get.rs10
-rw-r--r--src/event/routes/test/channel.rs91
-rw-r--r--src/event/routes/test/invite.rs26
-rw-r--r--src/event/routes/test/message.rs115
-rw-r--r--src/event/routes/test/resume.rs12
-rw-r--r--src/event/routes/test/setup.rs13
-rw-r--r--src/event/routes/test/token.rs19
-rw-r--r--src/event/sequence.rs9
10 files changed, 203 insertions, 103 deletions
diff --git a/src/event/app.rs b/src/event/app.rs
index c754388..b309245 100644
--- a/src/event/app.rs
+++ b/src/event/app.rs
@@ -6,7 +6,7 @@ use futures::{
use itertools::Itertools as _;
use sqlx::sqlite::SqlitePool;
-use super::{broadcaster::Broadcaster, Event, ResumePoint, Sequence, Sequenced};
+use super::{broadcaster::Broadcaster, Event, Sequence, Sequenced};
use crate::{
channel::{self, repo::Provider as _},
login::{self, repo::Provider as _},
@@ -26,9 +26,8 @@ impl<'a> Events<'a> {
pub async fn subscribe(
&self,
- resume_at: impl Into<ResumePoint>,
+ resume_at: Sequence,
) -> Result<impl Stream<Item = Event> + std::fmt::Debug, Error> {
- let resume_at = resume_at.into();
// Subscribe before retrieving, to catch messages broadcast while we're
// querying the DB. We'll prune out duplicates later.
let live_messages = self.events.subscribe();
@@ -63,7 +62,7 @@ impl<'a> Events<'a> {
.merge_by(channel_events, Sequence::merge)
.merge_by(message_events, Sequence::merge)
.collect::<Vec<_>>();
- let resume_live_at = replay_events.last().map(Sequenced::sequence);
+ let resume_live_at = replay_events.last().map_or(resume_at, Sequenced::sequence);
let replay = stream::iter(replay_events);
@@ -77,7 +76,7 @@ impl<'a> Events<'a> {
Ok(replay.chain(live_messages))
}
- fn resume(resume_at: ResumePoint) -> impl for<'m> FnMut(&'m Event) -> future::Ready<bool> {
+ fn resume(resume_at: Sequence) -> impl for<'m> FnMut(&'m Event) -> future::Ready<bool> {
let filter = Sequence::after(resume_at);
move |event| future::ready(filter(event))
}
diff --git a/src/event/mod.rs b/src/event/mod.rs
index 69c7a10..9996916 100644
--- a/src/event/mod.rs
+++ b/src/event/mod.rs
@@ -13,8 +13,6 @@ pub use self::{
sequence::{Instant, Sequence, Sequenced},
};
-pub type ResumePoint = Option<Sequence>;
-
#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum Event {
diff --git a/src/event/routes/get.rs b/src/event/routes/get.rs
index 22e8762..ceebcc9 100644
--- a/src/event/routes/get.rs
+++ b/src/event/routes/get.rs
@@ -12,7 +12,7 @@ use futures::stream::{Stream, StreamExt as _};
use crate::{
app::App,
error::{Internal, Unauthorized},
- event::{app, extract::LastEventId, Event, ResumePoint, Sequence, Sequenced as _},
+ event::{app, extract::LastEventId, Event, Sequence, Sequenced as _},
token::{app::ValidateError, extract::Identity},
};
@@ -22,9 +22,7 @@ pub async fn handler(
last_event_id: Option<LastEventId<Sequence>>,
Query(query): Query<QueryParams>,
) -> Result<Response<impl Stream<Item = Event> + std::fmt::Debug>, Error> {
- let resume_at = last_event_id
- .map(LastEventId::into_inner)
- .or(query.resume_point);
+ let resume_at = last_event_id.map_or(query.resume_point, LastEventId::into_inner);
let stream = app.events().subscribe(resume_at).await?;
let stream = app.tokens().limit_stream(identity.token, stream).await?;
@@ -32,9 +30,9 @@ pub async fn handler(
Ok(Response(stream))
}
-#[derive(Default, serde::Deserialize)]
+#[derive(serde::Deserialize)]
pub struct QueryParams {
- pub resume_point: ResumePoint,
+ pub resume_point: Sequence,
}
#[derive(Debug)]
diff --git a/src/event/routes/test/channel.rs b/src/event/routes/test/channel.rs
index 6a0a803..0695ab1 100644
--- a/src/event/routes/test/channel.rs
+++ b/src/event/routes/test/channel.rs
@@ -12,14 +12,19 @@ async fn creating() {
// Set up the environment
let app = fixtures::scratch_app().await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
// Subscribe
let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) =
- get::handler(State(app.clone()), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
+ let get::Response(events) = get::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(get::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
// Create a channel
@@ -46,6 +51,7 @@ async fn previously_created() {
// Set up the environment
let app = fixtures::scratch_app().await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
// Create a channel
@@ -59,10 +65,14 @@ async fn previously_created() {
// Subscribe
let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) =
- get::handler(State(app.clone()), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
+ let get::Response(events) = get::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(get::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
// Verify channel created event
@@ -81,14 +91,19 @@ async fn expiring() {
let app = fixtures::scratch_app().await;
let channel = fixtures::channel::create(&app, &fixtures::ancient()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
// Subscribe
let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) =
- get::handler(State(app.clone()), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
+ let get::Response(events) = get::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(get::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
// Expire channels
@@ -113,6 +128,7 @@ async fn previously_expired() {
let app = fixtures::scratch_app().await;
let channel = fixtures::channel::create(&app, &fixtures::ancient()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
// Expire channels
@@ -124,10 +140,14 @@ async fn previously_expired() {
// Subscribe
let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) =
- get::handler(State(app.clone()), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
+ let get::Response(events) = get::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(get::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
// Check for expiry event
let _ = events
@@ -145,14 +165,19 @@ async fn deleting() {
let app = fixtures::scratch_app().await;
let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
// Subscribe
let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) =
- get::handler(State(app.clone()), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
+ let get::Response(events) = get::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(get::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
// Delete the channel
@@ -177,6 +202,7 @@ async fn previously_deleted() {
let app = fixtures::scratch_app().await;
let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
// Delete the channel
@@ -188,10 +214,14 @@ async fn previously_deleted() {
// Subscribe
let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) =
- get::handler(State(app.clone()), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
+ let get::Response(events) = get::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(get::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
// Check for expiry event
let _ = events
@@ -209,6 +239,7 @@ async fn previously_purged() {
let app = fixtures::scratch_app().await;
let channel = fixtures::channel::create(&app, &fixtures::ancient()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
// Delete and purge the channel
@@ -225,10 +256,14 @@ async fn previously_purged() {
// Subscribe
let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) =
- get::handler(State(app.clone()), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
+ let get::Response(events) = get::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(get::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
// Check for expiry event
events
diff --git a/src/event/routes/test/invite.rs b/src/event/routes/test/invite.rs
index d24f474..73af62d 100644
--- a/src/event/routes/test/invite.rs
+++ b/src/event/routes/test/invite.rs
@@ -14,14 +14,19 @@ async fn accepting_invite() {
let app = fixtures::scratch_app().await;
let issuer = fixtures::login::create(&app, &fixtures::now()).await;
let invite = fixtures::invite::issue(&app, &issuer, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
// Subscribe
let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) =
- get::handler(State(app.clone()), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
+ let get::Response(events) = get::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(get::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
// Accept the invite
@@ -50,6 +55,7 @@ async fn previously_accepted_invite() {
let app = fixtures::scratch_app().await;
let issuer = fixtures::login::create(&app, &fixtures::now()).await;
let invite = fixtures::invite::issue(&app, &issuer, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
// Accept the invite
@@ -63,10 +69,14 @@ async fn previously_accepted_invite() {
// Subscribe
let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) =
- get::handler(State(app.clone()), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
+ let get::Response(events) = get::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(get::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
// Expect a login created event
diff --git a/src/event/routes/test/message.rs b/src/event/routes/test/message.rs
index a7b25fb..fafaeb3 100644
--- a/src/event/routes/test/message.rs
+++ b/src/event/routes/test/message.rs
@@ -16,14 +16,19 @@ async fn sending() {
let app = fixtures::scratch_app().await;
let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
// Call the endpoint
let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) =
- get::handler(State(app.clone()), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
+ let get::Response(events) = get::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(get::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
// Send a message
@@ -56,6 +61,7 @@ async fn previously_sent() {
let app = fixtures::scratch_app().await;
let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
// Send a message
@@ -74,10 +80,14 @@ async fn previously_sent() {
// Call the endpoint
let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) =
- get::handler(State(app.clone()), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
+ let get::Response(events) = get::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(get::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
// Verify that an event is delivered
@@ -96,6 +106,7 @@ async fn sent_in_multiple_channels() {
let app = fixtures::scratch_app().await;
let sender = fixtures::login::create(&app, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
let channels = [
fixtures::channel::create(&app, &fixtures::now()).await,
@@ -115,9 +126,14 @@ async fn sent_in_multiple_channels() {
// Call the endpoint
let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) = get::handler(State(app), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
+ let get::Response(events) = get::handler(
+ State(app),
+ subscriber,
+ None,
+ Query(get::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
// Verify the structure of the response.
@@ -141,6 +157,7 @@ async fn sent_sequentially() {
let app = fixtures::scratch_app().await;
let channel = fixtures::channel::create(&app, &fixtures::now()).await;
let sender = fixtures::login::create(&app, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
let messages = vec![
fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
@@ -151,9 +168,14 @@ async fn sent_sequentially() {
// Subscribe
let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) = get::handler(State(app), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
+ let get::Response(events) = get::handler(
+ State(app),
+ subscriber,
+ None,
+ Query(get::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
// Verify the expected events in the expected order
@@ -180,14 +202,19 @@ async fn expiring() {
let channel = fixtures::channel::create(&app, &fixtures::ancient()).await;
let sender = fixtures::login::create(&app, &fixtures::ancient()).await;
let message = fixtures::message::send(&app, &channel, &sender, &fixtures::ancient()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
// Subscribe
let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) =
- get::handler(State(app.clone()), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
+ let get::Response(events) = get::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(get::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
// Expire messages
@@ -214,6 +241,7 @@ async fn previously_expired() {
let channel = fixtures::channel::create(&app, &fixtures::ancient()).await;
let sender = fixtures::login::create(&app, &fixtures::ancient()).await;
let message = fixtures::message::send(&app, &channel, &sender, &fixtures::ancient()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
// Expire messages
@@ -225,10 +253,14 @@ async fn previously_expired() {
// Subscribe
let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) =
- get::handler(State(app.clone()), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
+ let get::Response(events) = get::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(get::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
// Check for expiry event
let _ = events
@@ -248,14 +280,19 @@ async fn deleting() {
let channel = fixtures::channel::create(&app, &fixtures::now()).await;
let sender = fixtures::login::create(&app, &fixtures::now()).await;
let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
// Subscribe
let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) =
- get::handler(State(app.clone()), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
+ let get::Response(events) = get::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(get::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
// Delete the message
@@ -282,6 +319,7 @@ async fn previously_deleted() {
let channel = fixtures::channel::create(&app, &fixtures::now()).await;
let sender = fixtures::login::create(&app, &fixtures::now()).await;
let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
// Delete the message
@@ -293,10 +331,14 @@ async fn previously_deleted() {
// Subscribe
let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) =
- get::handler(State(app.clone()), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
+ let get::Response(events) = get::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(get::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
// Check for delete event
let _ = events
@@ -316,6 +358,7 @@ async fn previously_purged() {
let channel = fixtures::channel::create(&app, &fixtures::ancient()).await;
let sender = fixtures::login::create(&app, &fixtures::ancient()).await;
let message = fixtures::message::send(&app, &channel, &sender, &fixtures::ancient()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
// Purge the message
@@ -332,10 +375,14 @@ async fn previously_purged() {
// Subscribe
let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) =
- get::handler(State(app.clone()), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
+ let get::Response(events) = get::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(get::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
// Check for delete event
diff --git a/src/event/routes/test/resume.rs b/src/event/routes/test/resume.rs
index 62b9bad..fabda0c 100644
--- a/src/event/routes/test/resume.rs
+++ b/src/event/routes/test/resume.rs
@@ -16,6 +16,7 @@ async fn resume() {
let app = fixtures::scratch_app().await;
let channel = fixtures::channel::create(&app, &fixtures::now()).await;
let sender = fixtures::login::create(&app, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
let initial_message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await;
@@ -34,7 +35,7 @@ async fn resume() {
State(app.clone()),
subscriber.clone(),
None,
- Query::default(),
+ Query(get::QueryParams { resume_point }),
)
.await
.expect("subscribe never fails");
@@ -55,7 +56,7 @@ async fn resume() {
State(app),
subscriber,
Some(resume_at.into()),
- Query::default(),
+ Query(get::QueryParams { resume_point }),
)
.await
.expect("subscribe never fails");
@@ -98,6 +99,7 @@ async fn serial_resume() {
let sender = fixtures::login::create(&app, &fixtures::now()).await;
let channel_a = fixtures::channel::create(&app, &fixtures::now()).await;
let channel_b = fixtures::channel::create(&app, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
// Call the endpoint
@@ -115,7 +117,7 @@ async fn serial_resume() {
State(app.clone()),
subscriber.clone(),
None,
- Query::default(),
+ Query(get::QueryParams { resume_point }),
)
.await
.expect("subscribe never fails");
@@ -156,7 +158,7 @@ async fn serial_resume() {
State(app.clone()),
subscriber.clone(),
Some(resume_at.into()),
- Query::default(),
+ Query(get::QueryParams { resume_point }),
)
.await
.expect("subscribe never fails");
@@ -197,7 +199,7 @@ async fn serial_resume() {
State(app.clone()),
subscriber.clone(),
Some(resume_at.into()),
- Query::default(),
+ Query(get::QueryParams { resume_point }),
)
.await
.expect("subscribe never fails");
diff --git a/src/event/routes/test/setup.rs b/src/event/routes/test/setup.rs
index 007b03d..26b7ea7 100644
--- a/src/event/routes/test/setup.rs
+++ b/src/event/routes/test/setup.rs
@@ -15,6 +15,7 @@ async fn previously_completed() {
// Set up the environment
let app = fixtures::scratch_app().await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
// Complete initial setup
@@ -28,10 +29,14 @@ async fn previously_completed() {
// Subscribe to events
let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) =
- get::handler(State(app.clone()), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
+ let get::Response(events) = get::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(get::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
// Expect a login created event
diff --git a/src/event/routes/test/token.rs b/src/event/routes/test/token.rs
index 16ac7c3..fa76865 100644
--- a/src/event/routes/test/token.rs
+++ b/src/event/routes/test/token.rs
@@ -14,6 +14,7 @@ async fn terminates_on_token_expiry() {
let app = fixtures::scratch_app().await;
let channel = fixtures::channel::create(&app, &fixtures::now()).await;
let sender = fixtures::login::create(&app, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
// Subscribe via the endpoint
@@ -21,10 +22,14 @@ async fn terminates_on_token_expiry() {
let subscriber =
fixtures::identity::logged_in(&app, &subscriber_creds, &fixtures::ancient()).await;
- let get::Response(events) =
- get::handler(State(app.clone()), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
+ let get::Response(events) = get::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(get::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
// Verify the resulting stream's behaviour
@@ -56,6 +61,7 @@ async fn terminates_on_logout() {
let app = fixtures::scratch_app().await;
let channel = fixtures::channel::create(&app, &fixtures::now()).await;
let sender = fixtures::login::create(&app, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
// Subscribe via the endpoint
@@ -65,7 +71,7 @@ async fn terminates_on_logout() {
State(app.clone()),
subscriber.clone(),
None,
- Query::default(),
+ Query(get::QueryParams { resume_point }),
)
.await
.expect("subscribe never fails");
@@ -101,6 +107,7 @@ async fn terminates_on_password_change() {
let app = fixtures::scratch_app().await;
let channel = fixtures::channel::create(&app, &fixtures::now()).await;
let sender = fixtures::login::create(&app, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
// Subscribe via the endpoint
@@ -112,7 +119,7 @@ async fn terminates_on_password_change() {
State(app.clone()),
subscriber.clone(),
None,
- Query::default(),
+ Query(get::QueryParams { resume_point }),
)
.await
.expect("subscribe never fails");
diff --git a/src/event/sequence.rs b/src/event/sequence.rs
index 9bc399b..77281c2 100644
--- a/src/event/sequence.rs
+++ b/src/event/sequence.rs
@@ -1,6 +1,5 @@
use std::fmt;
-use super::ResumePoint;
use crate::clock::DateTime;
#[derive(Clone, Copy, Debug, Eq, PartialEq, serde::Serialize)]
@@ -51,18 +50,18 @@ impl fmt::Display for Sequence {
}
impl Sequence {
- pub fn up_to<E>(resume_point: ResumePoint) -> impl for<'e> Fn(&'e E) -> bool
+ pub fn up_to<E>(resume_point: Sequence) -> impl for<'e> Fn(&'e E) -> bool
where
E: Sequenced,
{
- move |event| resume_point.map_or(true, |resume_point| event.sequence() <= resume_point)
+ move |event| event.sequence() <= resume_point
}
- pub fn after<E>(resume_point: ResumePoint) -> impl for<'e> Fn(&'e E) -> bool
+ pub fn after<E>(resume_point: Sequence) -> impl for<'e> Fn(&'e E) -> bool
where
E: Sequenced,
{
- move |event| resume_point < Some(event.sequence())
+ move |event| resume_point < event.sequence()
}
pub fn start_from<E>(resume_point: Self) -> impl for<'e> Fn(&'e E) -> bool