summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorOwen Jacobson <owen@grimoire.ca>2024-10-30 02:01:31 -0400
committerOwen Jacobson <owen@grimoire.ca>2024-10-30 02:01:31 -0400
commit50a382528288248381b07c25719cbc9a519b4c81 (patch)
tree01fc7a2997c3678aa687a75e2e7d56ef0876b450 /src
parent70591c5ac10069a4ae649bd6f79d769da9e32a98 (diff)
Resume points are no longer optional.
This is an inconsequential change for actual clients, since "resume from the beginning" was never a preferred mode of operation, and it simplifies some internals. It should also mean we get better query plans where `coalesce(cond, true)` was previously being used.
Diffstat (limited to 'src')
-rw-r--r--src/boot/app.rs4
-rw-r--r--src/channel/history.rs6
-rw-r--r--src/channel/repo.rs11
-rw-r--r--src/channel/routes/channel/test/post.rs3
-rw-r--r--src/channel/routes/test.rs3
-rw-r--r--src/event/app.rs9
-rw-r--r--src/event/mod.rs2
-rw-r--r--src/event/routes/get.rs10
-rw-r--r--src/event/routes/test/channel.rs91
-rw-r--r--src/event/routes/test/invite.rs26
-rw-r--r--src/event/routes/test/message.rs115
-rw-r--r--src/event/routes/test/resume.rs12
-rw-r--r--src/event/routes/test/setup.rs13
-rw-r--r--src/event/routes/test/token.rs19
-rw-r--r--src/event/sequence.rs9
-rw-r--r--src/login/history.rs6
-rw-r--r--src/login/repo.rs10
-rw-r--r--src/message/history.rs6
-rw-r--r--src/message/repo.rs21
-rw-r--r--src/test/fixtures/boot.rs9
-rw-r--r--src/test/fixtures/mod.rs1
21 files changed, 250 insertions, 136 deletions
diff --git a/src/boot/app.rs b/src/boot/app.rs
index e716b58..909f7d8 100644
--- a/src/boot/app.rs
+++ b/src/boot/app.rs
@@ -22,9 +22,9 @@ impl<'a> Boot<'a> {
let mut tx = self.db.begin().await?;
let resume_point = tx.sequence().current().await?;
- let logins = tx.logins().all(resume_point.into()).await?;
+ let logins = tx.logins().all(resume_point).await?;
let channels = tx.channels().all(resume_point).await?;
- let messages = tx.messages().all(resume_point.into()).await?;
+ let messages = tx.messages().all(resume_point).await?;
tx.commit().await?;
diff --git a/src/channel/history.rs b/src/channel/history.rs
index dda7bb9..ef2120d 100644
--- a/src/channel/history.rs
+++ b/src/channel/history.rs
@@ -4,7 +4,7 @@ use super::{
event::{Created, Deleted, Event},
Channel, Id,
};
-use crate::event::{Instant, ResumePoint, Sequence};
+use crate::event::{Instant, Sequence};
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct History {
@@ -28,9 +28,9 @@ impl History {
self.channel.clone()
}
- pub fn as_of(&self, resume_point: impl Into<ResumePoint>) -> Option<Channel> {
+ pub fn as_of(&self, resume_point: Sequence) -> Option<Channel> {
self.events()
- .filter(Sequence::up_to(resume_point.into()))
+ .filter(Sequence::up_to(resume_point))
.collect()
}
diff --git a/src/channel/repo.rs b/src/channel/repo.rs
index f47e564..7206c21 100644
--- a/src/channel/repo.rs
+++ b/src/channel/repo.rs
@@ -5,7 +5,7 @@ use crate::{
channel::{Channel, History, Id},
clock::DateTime,
db::NotFound,
- event::{Instant, ResumePoint, Sequence},
+ event::{Instant, Sequence},
name::{self, Name},
};
@@ -144,13 +144,13 @@ impl<'c> Channels<'c> {
Ok(channels)
}
- pub async fn replay(&mut self, resume_at: ResumePoint) -> Result<Vec<History>, LoadError> {
+ pub async fn replay(&mut self, resume_at: Sequence) -> Result<Vec<History>, LoadError> {
let channels = sqlx::query!(
r#"
select
id as "id: Id",
- name.display_name as "display_name: String",
- name.canonical_name as "canonical_name: String",
+ name.display_name as "display_name?: String",
+ name.canonical_name as "canonical_name?: String",
channel.created_at as "created_at: DateTime",
channel.created_sequence as "created_sequence: Sequence",
deleted.deleted_at as "deleted_at?: DateTime",
@@ -160,7 +160,8 @@ impl<'c> Channels<'c> {
using (id)
left join channel_deleted as deleted
using (id)
- where coalesce(channel.created_sequence > $1, true)
+ where channel.created_sequence > $1
+ or deleted.deleted_sequence > $1
"#,
resume_at,
)
diff --git a/src/channel/routes/channel/test/post.rs b/src/channel/routes/channel/test/post.rs
index 111a703..bc0684b 100644
--- a/src/channel/routes/channel/test/post.rs
+++ b/src/channel/routes/channel/test/post.rs
@@ -15,6 +15,7 @@ async fn messages_in_order() {
let app = fixtures::scratch_app().await;
let sender = fixtures::identity::create(&app, &fixtures::now()).await;
let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
// Call the endpoint (twice)
@@ -41,7 +42,7 @@ async fn messages_in_order() {
let mut events = app
.events()
- .subscribe(None)
+ .subscribe(resume_point)
.await
.expect("subscribing to a valid channel succeeds")
.filter_map(fixtures::event::message)
diff --git a/src/channel/routes/test.rs b/src/channel/routes/test.rs
index f5369fb..cba8f2e 100644
--- a/src/channel/routes/test.rs
+++ b/src/channel/routes/test.rs
@@ -16,6 +16,7 @@ async fn new_channel() {
let app = fixtures::scratch_app().await;
let creator = fixtures::identity::create(&app, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
// Call the endpoint
@@ -44,7 +45,7 @@ async fn new_channel() {
let mut events = app
.events()
- .subscribe(None)
+ .subscribe(resume_point)
.await
.expect("subscribing never fails")
.filter_map(fixtures::event::channel)
diff --git a/src/event/app.rs b/src/event/app.rs
index c754388..b309245 100644
--- a/src/event/app.rs
+++ b/src/event/app.rs
@@ -6,7 +6,7 @@ use futures::{
use itertools::Itertools as _;
use sqlx::sqlite::SqlitePool;
-use super::{broadcaster::Broadcaster, Event, ResumePoint, Sequence, Sequenced};
+use super::{broadcaster::Broadcaster, Event, Sequence, Sequenced};
use crate::{
channel::{self, repo::Provider as _},
login::{self, repo::Provider as _},
@@ -26,9 +26,8 @@ impl<'a> Events<'a> {
pub async fn subscribe(
&self,
- resume_at: impl Into<ResumePoint>,
+ resume_at: Sequence,
) -> Result<impl Stream<Item = Event> + std::fmt::Debug, Error> {
- let resume_at = resume_at.into();
// Subscribe before retrieving, to catch messages broadcast while we're
// querying the DB. We'll prune out duplicates later.
let live_messages = self.events.subscribe();
@@ -63,7 +62,7 @@ impl<'a> Events<'a> {
.merge_by(channel_events, Sequence::merge)
.merge_by(message_events, Sequence::merge)
.collect::<Vec<_>>();
- let resume_live_at = replay_events.last().map(Sequenced::sequence);
+ let resume_live_at = replay_events.last().map_or(resume_at, Sequenced::sequence);
let replay = stream::iter(replay_events);
@@ -77,7 +76,7 @@ impl<'a> Events<'a> {
Ok(replay.chain(live_messages))
}
- fn resume(resume_at: ResumePoint) -> impl for<'m> FnMut(&'m Event) -> future::Ready<bool> {
+ fn resume(resume_at: Sequence) -> impl for<'m> FnMut(&'m Event) -> future::Ready<bool> {
let filter = Sequence::after(resume_at);
move |event| future::ready(filter(event))
}
diff --git a/src/event/mod.rs b/src/event/mod.rs
index 69c7a10..9996916 100644
--- a/src/event/mod.rs
+++ b/src/event/mod.rs
@@ -13,8 +13,6 @@ pub use self::{
sequence::{Instant, Sequence, Sequenced},
};
-pub type ResumePoint = Option<Sequence>;
-
#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum Event {
diff --git a/src/event/routes/get.rs b/src/event/routes/get.rs
index 22e8762..ceebcc9 100644
--- a/src/event/routes/get.rs
+++ b/src/event/routes/get.rs
@@ -12,7 +12,7 @@ use futures::stream::{Stream, StreamExt as _};
use crate::{
app::App,
error::{Internal, Unauthorized},
- event::{app, extract::LastEventId, Event, ResumePoint, Sequence, Sequenced as _},
+ event::{app, extract::LastEventId, Event, Sequence, Sequenced as _},
token::{app::ValidateError, extract::Identity},
};
@@ -22,9 +22,7 @@ pub async fn handler(
last_event_id: Option<LastEventId<Sequence>>,
Query(query): Query<QueryParams>,
) -> Result<Response<impl Stream<Item = Event> + std::fmt::Debug>, Error> {
- let resume_at = last_event_id
- .map(LastEventId::into_inner)
- .or(query.resume_point);
+ let resume_at = last_event_id.map_or(query.resume_point, LastEventId::into_inner);
let stream = app.events().subscribe(resume_at).await?;
let stream = app.tokens().limit_stream(identity.token, stream).await?;
@@ -32,9 +30,9 @@ pub async fn handler(
Ok(Response(stream))
}
-#[derive(Default, serde::Deserialize)]
+#[derive(serde::Deserialize)]
pub struct QueryParams {
- pub resume_point: ResumePoint,
+ pub resume_point: Sequence,
}
#[derive(Debug)]
diff --git a/src/event/routes/test/channel.rs b/src/event/routes/test/channel.rs
index 6a0a803..0695ab1 100644
--- a/src/event/routes/test/channel.rs
+++ b/src/event/routes/test/channel.rs
@@ -12,14 +12,19 @@ async fn creating() {
// Set up the environment
let app = fixtures::scratch_app().await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
// Subscribe
let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) =
- get::handler(State(app.clone()), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
+ let get::Response(events) = get::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(get::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
// Create a channel
@@ -46,6 +51,7 @@ async fn previously_created() {
// Set up the environment
let app = fixtures::scratch_app().await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
// Create a channel
@@ -59,10 +65,14 @@ async fn previously_created() {
// Subscribe
let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) =
- get::handler(State(app.clone()), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
+ let get::Response(events) = get::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(get::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
// Verify channel created event
@@ -81,14 +91,19 @@ async fn expiring() {
let app = fixtures::scratch_app().await;
let channel = fixtures::channel::create(&app, &fixtures::ancient()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
// Subscribe
let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) =
- get::handler(State(app.clone()), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
+ let get::Response(events) = get::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(get::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
// Expire channels
@@ -113,6 +128,7 @@ async fn previously_expired() {
let app = fixtures::scratch_app().await;
let channel = fixtures::channel::create(&app, &fixtures::ancient()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
// Expire channels
@@ -124,10 +140,14 @@ async fn previously_expired() {
// Subscribe
let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) =
- get::handler(State(app.clone()), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
+ let get::Response(events) = get::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(get::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
// Check for expiry event
let _ = events
@@ -145,14 +165,19 @@ async fn deleting() {
let app = fixtures::scratch_app().await;
let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
// Subscribe
let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) =
- get::handler(State(app.clone()), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
+ let get::Response(events) = get::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(get::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
// Delete the channel
@@ -177,6 +202,7 @@ async fn previously_deleted() {
let app = fixtures::scratch_app().await;
let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
// Delete the channel
@@ -188,10 +214,14 @@ async fn previously_deleted() {
// Subscribe
let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) =
- get::handler(State(app.clone()), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
+ let get::Response(events) = get::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(get::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
// Check for expiry event
let _ = events
@@ -209,6 +239,7 @@ async fn previously_purged() {
let app = fixtures::scratch_app().await;
let channel = fixtures::channel::create(&app, &fixtures::ancient()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
// Delete and purge the channel
@@ -225,10 +256,14 @@ async fn previously_purged() {
// Subscribe
let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) =
- get::handler(State(app.clone()), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
+ let get::Response(events) = get::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(get::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
// Check for expiry event
events
diff --git a/src/event/routes/test/invite.rs b/src/event/routes/test/invite.rs
index d24f474..73af62d 100644
--- a/src/event/routes/test/invite.rs
+++ b/src/event/routes/test/invite.rs
@@ -14,14 +14,19 @@ async fn accepting_invite() {
let app = fixtures::scratch_app().await;
let issuer = fixtures::login::create(&app, &fixtures::now()).await;
let invite = fixtures::invite::issue(&app, &issuer, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
// Subscribe
let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) =
- get::handler(State(app.clone()), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
+ let get::Response(events) = get::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(get::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
// Accept the invite
@@ -50,6 +55,7 @@ async fn previously_accepted_invite() {
let app = fixtures::scratch_app().await;
let issuer = fixtures::login::create(&app, &fixtures::now()).await;
let invite = fixtures::invite::issue(&app, &issuer, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
// Accept the invite
@@ -63,10 +69,14 @@ async fn previously_accepted_invite() {
// Subscribe
let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) =
- get::handler(State(app.clone()), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
+ let get::Response(events) = get::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(get::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
// Expect a login created event
diff --git a/src/event/routes/test/message.rs b/src/event/routes/test/message.rs
index a7b25fb..fafaeb3 100644
--- a/src/event/routes/test/message.rs
+++ b/src/event/routes/test/message.rs
@@ -16,14 +16,19 @@ async fn sending() {
let app = fixtures::scratch_app().await;
let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
// Call the endpoint
let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) =
- get::handler(State(app.clone()), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
+ let get::Response(events) = get::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(get::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
// Send a message
@@ -56,6 +61,7 @@ async fn previously_sent() {
let app = fixtures::scratch_app().await;
let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
// Send a message
@@ -74,10 +80,14 @@ async fn previously_sent() {
// Call the endpoint
let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) =
- get::handler(State(app.clone()), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
+ let get::Response(events) = get::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(get::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
// Verify that an event is delivered
@@ -96,6 +106,7 @@ async fn sent_in_multiple_channels() {
let app = fixtures::scratch_app().await;
let sender = fixtures::login::create(&app, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
let channels = [
fixtures::channel::create(&app, &fixtures::now()).await,
@@ -115,9 +126,14 @@ async fn sent_in_multiple_channels() {
// Call the endpoint
let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) = get::handler(State(app), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
+ let get::Response(events) = get::handler(
+ State(app),
+ subscriber,
+ None,
+ Query(get::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
// Verify the structure of the response.
@@ -141,6 +157,7 @@ async fn sent_sequentially() {
let app = fixtures::scratch_app().await;
let channel = fixtures::channel::create(&app, &fixtures::now()).await;
let sender = fixtures::login::create(&app, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
let messages = vec![
fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
@@ -151,9 +168,14 @@ async fn sent_sequentially() {
// Subscribe
let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) = get::handler(State(app), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
+ let get::Response(events) = get::handler(
+ State(app),
+ subscriber,
+ None,
+ Query(get::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
// Verify the expected events in the expected order
@@ -180,14 +202,19 @@ async fn expiring() {
let channel = fixtures::channel::create(&app, &fixtures::ancient()).await;
let sender = fixtures::login::create(&app, &fixtures::ancient()).await;
let message = fixtures::message::send(&app, &channel, &sender, &fixtures::ancient()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
// Subscribe
let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) =
- get::handler(State(app.clone()), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
+ let get::Response(events) = get::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(get::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
// Expire messages
@@ -214,6 +241,7 @@ async fn previously_expired() {
let channel = fixtures::channel::create(&app, &fixtures::ancient()).await;
let sender = fixtures::login::create(&app, &fixtures::ancient()).await;
let message = fixtures::message::send(&app, &channel, &sender, &fixtures::ancient()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
// Expire messages
@@ -225,10 +253,14 @@ async fn previously_expired() {
// Subscribe
let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) =
- get::handler(State(app.clone()), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
+ let get::Response(events) = get::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(get::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
// Check for expiry event
let _ = events
@@ -248,14 +280,19 @@ async fn deleting() {
let channel = fixtures::channel::create(&app, &fixtures::now()).await;
let sender = fixtures::login::create(&app, &fixtures::now()).await;
let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
// Subscribe
let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) =
- get::handler(State(app.clone()), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
+ let get::Response(events) = get::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(get::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
// Delete the message
@@ -282,6 +319,7 @@ async fn previously_deleted() {
let channel = fixtures::channel::create(&app, &fixtures::now()).await;
let sender = fixtures::login::create(&app, &fixtures::now()).await;
let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
// Delete the message
@@ -293,10 +331,14 @@ async fn previously_deleted() {
// Subscribe
let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) =
- get::handler(State(app.clone()), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
+ let get::Response(events) = get::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(get::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
// Check for delete event
let _ = events
@@ -316,6 +358,7 @@ async fn previously_purged() {
let channel = fixtures::channel::create(&app, &fixtures::ancient()).await;
let sender = fixtures::login::create(&app, &fixtures::ancient()).await;
let message = fixtures::message::send(&app, &channel, &sender, &fixtures::ancient()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
// Purge the message
@@ -332,10 +375,14 @@ async fn previously_purged() {
// Subscribe
let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) =
- get::handler(State(app.clone()), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
+ let get::Response(events) = get::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(get::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
// Check for delete event
diff --git a/src/event/routes/test/resume.rs b/src/event/routes/test/resume.rs
index 62b9bad..fabda0c 100644
--- a/src/event/routes/test/resume.rs
+++ b/src/event/routes/test/resume.rs
@@ -16,6 +16,7 @@ async fn resume() {
let app = fixtures::scratch_app().await;
let channel = fixtures::channel::create(&app, &fixtures::now()).await;
let sender = fixtures::login::create(&app, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
let initial_message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await;
@@ -34,7 +35,7 @@ async fn resume() {
State(app.clone()),
subscriber.clone(),
None,
- Query::default(),
+ Query(get::QueryParams { resume_point }),
)
.await
.expect("subscribe never fails");
@@ -55,7 +56,7 @@ async fn resume() {
State(app),
subscriber,
Some(resume_at.into()),
- Query::default(),
+ Query(get::QueryParams { resume_point }),
)
.await
.expect("subscribe never fails");
@@ -98,6 +99,7 @@ async fn serial_resume() {
let sender = fixtures::login::create(&app, &fixtures::now()).await;
let channel_a = fixtures::channel::create(&app, &fixtures::now()).await;
let channel_b = fixtures::channel::create(&app, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
// Call the endpoint
@@ -115,7 +117,7 @@ async fn serial_resume() {
State(app.clone()),
subscriber.clone(),
None,
- Query::default(),
+ Query(get::QueryParams { resume_point }),
)
.await
.expect("subscribe never fails");
@@ -156,7 +158,7 @@ async fn serial_resume() {
State(app.clone()),
subscriber.clone(),
Some(resume_at.into()),
- Query::default(),
+ Query(get::QueryParams { resume_point }),
)
.await
.expect("subscribe never fails");
@@ -197,7 +199,7 @@ async fn serial_resume() {
State(app.clone()),
subscriber.clone(),
Some(resume_at.into()),
- Query::default(),
+ Query(get::QueryParams { resume_point }),
)
.await
.expect("subscribe never fails");
diff --git a/src/event/routes/test/setup.rs b/src/event/routes/test/setup.rs
index 007b03d..26b7ea7 100644
--- a/src/event/routes/test/setup.rs
+++ b/src/event/routes/test/setup.rs
@@ -15,6 +15,7 @@ async fn previously_completed() {
// Set up the environment
let app = fixtures::scratch_app().await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
// Complete initial setup
@@ -28,10 +29,14 @@ async fn previously_completed() {
// Subscribe to events
let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) =
- get::handler(State(app.clone()), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
+ let get::Response(events) = get::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(get::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
// Expect a login created event
diff --git a/src/event/routes/test/token.rs b/src/event/routes/test/token.rs
index 16ac7c3..fa76865 100644
--- a/src/event/routes/test/token.rs
+++ b/src/event/routes/test/token.rs
@@ -14,6 +14,7 @@ async fn terminates_on_token_expiry() {
let app = fixtures::scratch_app().await;
let channel = fixtures::channel::create(&app, &fixtures::now()).await;
let sender = fixtures::login::create(&app, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
// Subscribe via the endpoint
@@ -21,10 +22,14 @@ async fn terminates_on_token_expiry() {
let subscriber =
fixtures::identity::logged_in(&app, &subscriber_creds, &fixtures::ancient()).await;
- let get::Response(events) =
- get::handler(State(app.clone()), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
+ let get::Response(events) = get::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(get::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
// Verify the resulting stream's behaviour
@@ -56,6 +61,7 @@ async fn terminates_on_logout() {
let app = fixtures::scratch_app().await;
let channel = fixtures::channel::create(&app, &fixtures::now()).await;
let sender = fixtures::login::create(&app, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
// Subscribe via the endpoint
@@ -65,7 +71,7 @@ async fn terminates_on_logout() {
State(app.clone()),
subscriber.clone(),
None,
- Query::default(),
+ Query(get::QueryParams { resume_point }),
)
.await
.expect("subscribe never fails");
@@ -101,6 +107,7 @@ async fn terminates_on_password_change() {
let app = fixtures::scratch_app().await;
let channel = fixtures::channel::create(&app, &fixtures::now()).await;
let sender = fixtures::login::create(&app, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
// Subscribe via the endpoint
@@ -112,7 +119,7 @@ async fn terminates_on_password_change() {
State(app.clone()),
subscriber.clone(),
None,
- Query::default(),
+ Query(get::QueryParams { resume_point }),
)
.await
.expect("subscribe never fails");
diff --git a/src/event/sequence.rs b/src/event/sequence.rs
index 9bc399b..77281c2 100644
--- a/src/event/sequence.rs
+++ b/src/event/sequence.rs
@@ -1,6 +1,5 @@
use std::fmt;
-use super::ResumePoint;
use crate::clock::DateTime;
#[derive(Clone, Copy, Debug, Eq, PartialEq, serde::Serialize)]
@@ -51,18 +50,18 @@ impl fmt::Display for Sequence {
}
impl Sequence {
- pub fn up_to<E>(resume_point: ResumePoint) -> impl for<'e> Fn(&'e E) -> bool
+ pub fn up_to<E>(resume_point: Sequence) -> impl for<'e> Fn(&'e E) -> bool
where
E: Sequenced,
{
- move |event| resume_point.map_or(true, |resume_point| event.sequence() <= resume_point)
+ move |event| event.sequence() <= resume_point
}
- pub fn after<E>(resume_point: ResumePoint) -> impl for<'e> Fn(&'e E) -> bool
+ pub fn after<E>(resume_point: Sequence) -> impl for<'e> Fn(&'e E) -> bool
where
E: Sequenced,
{
- move |event| resume_point < Some(event.sequence())
+ move |event| resume_point < event.sequence()
}
pub fn start_from<E>(resume_point: Self) -> impl for<'e> Fn(&'e E) -> bool
diff --git a/src/login/history.rs b/src/login/history.rs
index daad579..8161b0b 100644
--- a/src/login/history.rs
+++ b/src/login/history.rs
@@ -2,7 +2,7 @@ use super::{
event::{Created, Event},
Id, Login,
};
-use crate::event::{Instant, ResumePoint, Sequence};
+use crate::event::{Instant, Sequence};
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct History {
@@ -24,9 +24,9 @@ impl History {
self.login.clone()
}
- pub fn as_of(&self, resume_point: impl Into<ResumePoint>) -> Option<Login> {
+ pub fn as_of(&self, resume_point: Sequence) -> Option<Login> {
self.events()
- .filter(Sequence::up_to(resume_point.into()))
+ .filter(Sequence::up_to(resume_point))
.collect()
}
diff --git a/src/login/repo.rs b/src/login/repo.rs
index 9439a25..1c63a4b 100644
--- a/src/login/repo.rs
+++ b/src/login/repo.rs
@@ -3,7 +3,7 @@ use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction};
use crate::{
clock::DateTime,
- event::{Instant, ResumePoint, Sequence},
+ event::{Instant, Sequence},
login::{password::StoredHash, History, Id, Login},
name::{self, Name},
};
@@ -81,7 +81,7 @@ impl<'c> Logins<'c> {
Ok(())
}
- pub async fn all(&mut self, resume_at: ResumePoint) -> Result<Vec<History>, LoadError> {
+ pub async fn all(&mut self, resume_at: Sequence) -> Result<Vec<History>, LoadError> {
let logins = sqlx::query!(
r#"
select
@@ -91,7 +91,7 @@ impl<'c> Logins<'c> {
created_sequence as "created_sequence: Sequence",
created_at as "created_at: DateTime"
from login
- where coalesce(created_sequence <= $1, true)
+ where created_sequence <= $1
order by canonical_name
"#,
resume_at,
@@ -113,7 +113,7 @@ impl<'c> Logins<'c> {
Ok(logins)
}
- pub async fn replay(&mut self, resume_at: ResumePoint) -> Result<Vec<History>, LoadError> {
+ pub async fn replay(&mut self, resume_at: Sequence) -> Result<Vec<History>, LoadError> {
let logins = sqlx::query!(
r#"
select
@@ -123,7 +123,7 @@ impl<'c> Logins<'c> {
created_sequence as "created_sequence: Sequence",
created_at as "created_at: DateTime"
from login
- where coalesce(login.created_sequence > $1, true)
+ where login.created_sequence > $1
"#,
resume_at,
)
diff --git a/src/message/history.rs b/src/message/history.rs
index 67e437a..ed8f5df 100644
--- a/src/message/history.rs
+++ b/src/message/history.rs
@@ -4,7 +4,7 @@ use super::{
event::{Deleted, Event, Sent},
Id, Message,
};
-use crate::event::{Instant, ResumePoint, Sequence};
+use crate::event::{Instant, Sequence};
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct History {
@@ -27,9 +27,9 @@ impl History {
self.message.clone()
}
- pub fn as_of(&self, resume_point: impl Into<ResumePoint>) -> Option<Message> {
+ pub fn as_of(&self, resume_point: Sequence) -> Option<Message> {
self.events()
- .filter(Sequence::up_to(resume_point.into()))
+ .filter(Sequence::up_to(resume_point))
.collect()
}
diff --git a/src/message/repo.rs b/src/message/repo.rs
index c8ceceb..913135c 100644
--- a/src/message/repo.rs
+++ b/src/message/repo.rs
@@ -4,7 +4,7 @@ use super::{snapshot::Message, Body, History, Id};
use crate::{
channel,
clock::DateTime,
- event::{Instant, ResumePoint, Sequence},
+ event::{Instant, Sequence},
login::{self, Login},
};
@@ -106,22 +106,22 @@ impl<'c> Messages<'c> {
Ok(messages)
}
- pub async fn all(&mut self, resume_at: ResumePoint) -> Result<Vec<History>, sqlx::Error> {
+ pub async fn all(&mut self, resume_at: Sequence) -> Result<Vec<History>, sqlx::Error> {
let messages = sqlx::query!(
r#"
select
message.channel as "channel: channel::Id",
message.sender as "sender: login::Id",
- id as "id: Id",
+ message.id as "id: Id",
message.body as "body: Body",
message.sent_at as "sent_at: DateTime",
message.sent_sequence as "sent_sequence: Sequence",
- deleted.deleted_at as "deleted_at: DateTime",
- deleted.deleted_sequence as "deleted_sequence: Sequence"
+ deleted.deleted_at as "deleted_at?: DateTime",
+ deleted.deleted_sequence as "deleted_sequence?: Sequence"
from message
left join message_deleted as deleted
using (id)
- where coalesce(message.sent_sequence <= $2, true)
+ where message.sent_sequence <= $1
order by message.sent_sequence
"#,
resume_at,
@@ -282,7 +282,7 @@ impl<'c> Messages<'c> {
Ok(messages)
}
- pub async fn replay(&mut self, resume_at: ResumePoint) -> Result<Vec<History>, sqlx::Error> {
+ pub async fn replay(&mut self, resume_at: Sequence) -> Result<Vec<History>, sqlx::Error> {
let messages = sqlx::query!(
r#"
select
@@ -292,12 +292,13 @@ impl<'c> Messages<'c> {
message.sent_at as "sent_at: DateTime",
message.sent_sequence as "sent_sequence: Sequence",
message.body as "body: Body",
- deleted.deleted_at as "deleted_at: DateTime",
- deleted.deleted_sequence as "deleted_sequence: Sequence"
+ deleted.deleted_at as "deleted_at?: DateTime",
+ deleted.deleted_sequence as "deleted_sequence?: Sequence"
from message
left join message_deleted as deleted
using (id)
- where coalesce(message.sent_sequence > $1, true)
+ where message.sent_sequence > $1
+ or deleted.deleted_sequence > $1
"#,
resume_at,
)
diff --git a/src/test/fixtures/boot.rs b/src/test/fixtures/boot.rs
new file mode 100644
index 0000000..120726f
--- /dev/null
+++ b/src/test/fixtures/boot.rs
@@ -0,0 +1,9 @@
+use crate::{app::App, event::Sequence};
+
+pub async fn resume_point(app: &App) -> Sequence {
+ app.boot()
+ .snapshot()
+ .await
+ .expect("boot always succeeds")
+ .resume_point
+}
diff --git a/src/test/fixtures/mod.rs b/src/test/fixtures/mod.rs
index 2b7b6af..470b31a 100644
--- a/src/test/fixtures/mod.rs
+++ b/src/test/fixtures/mod.rs
@@ -2,6 +2,7 @@ use chrono::{TimeDelta, Utc};
use crate::{app::App, clock::RequestedAt, db};
+pub mod boot;
pub mod channel;
pub mod cookie;
pub mod event;