summaryrefslogtreecommitdiff
path: root/src/event/routes
diff options
context:
space:
mode:
authorOwen Jacobson <owen@grimoire.ca>2024-10-30 02:01:31 -0400
committerOwen Jacobson <owen@grimoire.ca>2024-10-30 02:01:31 -0400
commit50a382528288248381b07c25719cbc9a519b4c81 (patch)
tree01fc7a2997c3678aa687a75e2e7d56ef0876b450 /src/event/routes
parent70591c5ac10069a4ae649bd6f79d769da9e32a98 (diff)
Resume points are no longer optional.
This is an inconsequential change for actual clients, since "resume from the beginning" was never a preferred mode of operation, and it simplifies some internals. It should also mean we get better query plans where `coalesce(cond, true)` was previously being used.
Diffstat (limited to 'src/event/routes')
-rw-r--r--src/event/routes/get.rs10
-rw-r--r--src/event/routes/test/channel.rs91
-rw-r--r--src/event/routes/test/invite.rs26
-rw-r--r--src/event/routes/test/message.rs115
-rw-r--r--src/event/routes/test/resume.rs12
-rw-r--r--src/event/routes/test/setup.rs13
-rw-r--r--src/event/routes/test/token.rs19
7 files changed, 195 insertions, 91 deletions
diff --git a/src/event/routes/get.rs b/src/event/routes/get.rs
index 22e8762..ceebcc9 100644
--- a/src/event/routes/get.rs
+++ b/src/event/routes/get.rs
@@ -12,7 +12,7 @@ use futures::stream::{Stream, StreamExt as _};
use crate::{
app::App,
error::{Internal, Unauthorized},
- event::{app, extract::LastEventId, Event, ResumePoint, Sequence, Sequenced as _},
+ event::{app, extract::LastEventId, Event, Sequence, Sequenced as _},
token::{app::ValidateError, extract::Identity},
};
@@ -22,9 +22,7 @@ pub async fn handler(
last_event_id: Option<LastEventId<Sequence>>,
Query(query): Query<QueryParams>,
) -> Result<Response<impl Stream<Item = Event> + std::fmt::Debug>, Error> {
- let resume_at = last_event_id
- .map(LastEventId::into_inner)
- .or(query.resume_point);
+ let resume_at = last_event_id.map_or(query.resume_point, LastEventId::into_inner);
let stream = app.events().subscribe(resume_at).await?;
let stream = app.tokens().limit_stream(identity.token, stream).await?;
@@ -32,9 +30,9 @@ pub async fn handler(
Ok(Response(stream))
}
-#[derive(Default, serde::Deserialize)]
+#[derive(serde::Deserialize)]
pub struct QueryParams {
- pub resume_point: ResumePoint,
+ pub resume_point: Sequence,
}
#[derive(Debug)]
diff --git a/src/event/routes/test/channel.rs b/src/event/routes/test/channel.rs
index 6a0a803..0695ab1 100644
--- a/src/event/routes/test/channel.rs
+++ b/src/event/routes/test/channel.rs
@@ -12,14 +12,19 @@ async fn creating() {
// Set up the environment
let app = fixtures::scratch_app().await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
// Subscribe
let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) =
- get::handler(State(app.clone()), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
+ let get::Response(events) = get::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(get::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
// Create a channel
@@ -46,6 +51,7 @@ async fn previously_created() {
// Set up the environment
let app = fixtures::scratch_app().await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
// Create a channel
@@ -59,10 +65,14 @@ async fn previously_created() {
// Subscribe
let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) =
- get::handler(State(app.clone()), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
+ let get::Response(events) = get::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(get::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
// Verify channel created event
@@ -81,14 +91,19 @@ async fn expiring() {
let app = fixtures::scratch_app().await;
let channel = fixtures::channel::create(&app, &fixtures::ancient()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
// Subscribe
let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) =
- get::handler(State(app.clone()), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
+ let get::Response(events) = get::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(get::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
// Expire channels
@@ -113,6 +128,7 @@ async fn previously_expired() {
let app = fixtures::scratch_app().await;
let channel = fixtures::channel::create(&app, &fixtures::ancient()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
// Expire channels
@@ -124,10 +140,14 @@ async fn previously_expired() {
// Subscribe
let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) =
- get::handler(State(app.clone()), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
+ let get::Response(events) = get::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(get::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
// Check for expiry event
let _ = events
@@ -145,14 +165,19 @@ async fn deleting() {
let app = fixtures::scratch_app().await;
let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
// Subscribe
let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) =
- get::handler(State(app.clone()), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
+ let get::Response(events) = get::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(get::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
// Delete the channel
@@ -177,6 +202,7 @@ async fn previously_deleted() {
let app = fixtures::scratch_app().await;
let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
// Delete the channel
@@ -188,10 +214,14 @@ async fn previously_deleted() {
// Subscribe
let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) =
- get::handler(State(app.clone()), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
+ let get::Response(events) = get::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(get::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
// Check for expiry event
let _ = events
@@ -209,6 +239,7 @@ async fn previously_purged() {
let app = fixtures::scratch_app().await;
let channel = fixtures::channel::create(&app, &fixtures::ancient()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
// Delete and purge the channel
@@ -225,10 +256,14 @@ async fn previously_purged() {
// Subscribe
let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) =
- get::handler(State(app.clone()), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
+ let get::Response(events) = get::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(get::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
// Check for expiry event
events
diff --git a/src/event/routes/test/invite.rs b/src/event/routes/test/invite.rs
index d24f474..73af62d 100644
--- a/src/event/routes/test/invite.rs
+++ b/src/event/routes/test/invite.rs
@@ -14,14 +14,19 @@ async fn accepting_invite() {
let app = fixtures::scratch_app().await;
let issuer = fixtures::login::create(&app, &fixtures::now()).await;
let invite = fixtures::invite::issue(&app, &issuer, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
// Subscribe
let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) =
- get::handler(State(app.clone()), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
+ let get::Response(events) = get::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(get::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
// Accept the invite
@@ -50,6 +55,7 @@ async fn previously_accepted_invite() {
let app = fixtures::scratch_app().await;
let issuer = fixtures::login::create(&app, &fixtures::now()).await;
let invite = fixtures::invite::issue(&app, &issuer, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
// Accept the invite
@@ -63,10 +69,14 @@ async fn previously_accepted_invite() {
// Subscribe
let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) =
- get::handler(State(app.clone()), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
+ let get::Response(events) = get::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(get::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
// Expect a login created event
diff --git a/src/event/routes/test/message.rs b/src/event/routes/test/message.rs
index a7b25fb..fafaeb3 100644
--- a/src/event/routes/test/message.rs
+++ b/src/event/routes/test/message.rs
@@ -16,14 +16,19 @@ async fn sending() {
let app = fixtures::scratch_app().await;
let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
// Call the endpoint
let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) =
- get::handler(State(app.clone()), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
+ let get::Response(events) = get::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(get::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
// Send a message
@@ -56,6 +61,7 @@ async fn previously_sent() {
let app = fixtures::scratch_app().await;
let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
// Send a message
@@ -74,10 +80,14 @@ async fn previously_sent() {
// Call the endpoint
let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) =
- get::handler(State(app.clone()), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
+ let get::Response(events) = get::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(get::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
// Verify that an event is delivered
@@ -96,6 +106,7 @@ async fn sent_in_multiple_channels() {
let app = fixtures::scratch_app().await;
let sender = fixtures::login::create(&app, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
let channels = [
fixtures::channel::create(&app, &fixtures::now()).await,
@@ -115,9 +126,14 @@ async fn sent_in_multiple_channels() {
// Call the endpoint
let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) = get::handler(State(app), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
+ let get::Response(events) = get::handler(
+ State(app),
+ subscriber,
+ None,
+ Query(get::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
// Verify the structure of the response.
@@ -141,6 +157,7 @@ async fn sent_sequentially() {
let app = fixtures::scratch_app().await;
let channel = fixtures::channel::create(&app, &fixtures::now()).await;
let sender = fixtures::login::create(&app, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
let messages = vec![
fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
@@ -151,9 +168,14 @@ async fn sent_sequentially() {
// Subscribe
let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) = get::handler(State(app), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
+ let get::Response(events) = get::handler(
+ State(app),
+ subscriber,
+ None,
+ Query(get::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
// Verify the expected events in the expected order
@@ -180,14 +202,19 @@ async fn expiring() {
let channel = fixtures::channel::create(&app, &fixtures::ancient()).await;
let sender = fixtures::login::create(&app, &fixtures::ancient()).await;
let message = fixtures::message::send(&app, &channel, &sender, &fixtures::ancient()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
// Subscribe
let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) =
- get::handler(State(app.clone()), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
+ let get::Response(events) = get::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(get::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
// Expire messages
@@ -214,6 +241,7 @@ async fn previously_expired() {
let channel = fixtures::channel::create(&app, &fixtures::ancient()).await;
let sender = fixtures::login::create(&app, &fixtures::ancient()).await;
let message = fixtures::message::send(&app, &channel, &sender, &fixtures::ancient()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
// Expire messages
@@ -225,10 +253,14 @@ async fn previously_expired() {
// Subscribe
let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) =
- get::handler(State(app.clone()), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
+ let get::Response(events) = get::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(get::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
// Check for expiry event
let _ = events
@@ -248,14 +280,19 @@ async fn deleting() {
let channel = fixtures::channel::create(&app, &fixtures::now()).await;
let sender = fixtures::login::create(&app, &fixtures::now()).await;
let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
// Subscribe
let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) =
- get::handler(State(app.clone()), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
+ let get::Response(events) = get::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(get::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
// Delete the message
@@ -282,6 +319,7 @@ async fn previously_deleted() {
let channel = fixtures::channel::create(&app, &fixtures::now()).await;
let sender = fixtures::login::create(&app, &fixtures::now()).await;
let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
// Delete the message
@@ -293,10 +331,14 @@ async fn previously_deleted() {
// Subscribe
let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) =
- get::handler(State(app.clone()), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
+ let get::Response(events) = get::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(get::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
// Check for delete event
let _ = events
@@ -316,6 +358,7 @@ async fn previously_purged() {
let channel = fixtures::channel::create(&app, &fixtures::ancient()).await;
let sender = fixtures::login::create(&app, &fixtures::ancient()).await;
let message = fixtures::message::send(&app, &channel, &sender, &fixtures::ancient()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
// Purge the message
@@ -332,10 +375,14 @@ async fn previously_purged() {
// Subscribe
let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) =
- get::handler(State(app.clone()), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
+ let get::Response(events) = get::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(get::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
// Check for delete event
diff --git a/src/event/routes/test/resume.rs b/src/event/routes/test/resume.rs
index 62b9bad..fabda0c 100644
--- a/src/event/routes/test/resume.rs
+++ b/src/event/routes/test/resume.rs
@@ -16,6 +16,7 @@ async fn resume() {
let app = fixtures::scratch_app().await;
let channel = fixtures::channel::create(&app, &fixtures::now()).await;
let sender = fixtures::login::create(&app, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
let initial_message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await;
@@ -34,7 +35,7 @@ async fn resume() {
State(app.clone()),
subscriber.clone(),
None,
- Query::default(),
+ Query(get::QueryParams { resume_point }),
)
.await
.expect("subscribe never fails");
@@ -55,7 +56,7 @@ async fn resume() {
State(app),
subscriber,
Some(resume_at.into()),
- Query::default(),
+ Query(get::QueryParams { resume_point }),
)
.await
.expect("subscribe never fails");
@@ -98,6 +99,7 @@ async fn serial_resume() {
let sender = fixtures::login::create(&app, &fixtures::now()).await;
let channel_a = fixtures::channel::create(&app, &fixtures::now()).await;
let channel_b = fixtures::channel::create(&app, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
// Call the endpoint
@@ -115,7 +117,7 @@ async fn serial_resume() {
State(app.clone()),
subscriber.clone(),
None,
- Query::default(),
+ Query(get::QueryParams { resume_point }),
)
.await
.expect("subscribe never fails");
@@ -156,7 +158,7 @@ async fn serial_resume() {
State(app.clone()),
subscriber.clone(),
Some(resume_at.into()),
- Query::default(),
+ Query(get::QueryParams { resume_point }),
)
.await
.expect("subscribe never fails");
@@ -197,7 +199,7 @@ async fn serial_resume() {
State(app.clone()),
subscriber.clone(),
Some(resume_at.into()),
- Query::default(),
+ Query(get::QueryParams { resume_point }),
)
.await
.expect("subscribe never fails");
diff --git a/src/event/routes/test/setup.rs b/src/event/routes/test/setup.rs
index 007b03d..26b7ea7 100644
--- a/src/event/routes/test/setup.rs
+++ b/src/event/routes/test/setup.rs
@@ -15,6 +15,7 @@ async fn previously_completed() {
// Set up the environment
let app = fixtures::scratch_app().await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
// Complete initial setup
@@ -28,10 +29,14 @@ async fn previously_completed() {
// Subscribe to events
let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let get::Response(events) =
- get::handler(State(app.clone()), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
+ let get::Response(events) = get::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(get::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
// Expect a login created event
diff --git a/src/event/routes/test/token.rs b/src/event/routes/test/token.rs
index 16ac7c3..fa76865 100644
--- a/src/event/routes/test/token.rs
+++ b/src/event/routes/test/token.rs
@@ -14,6 +14,7 @@ async fn terminates_on_token_expiry() {
let app = fixtures::scratch_app().await;
let channel = fixtures::channel::create(&app, &fixtures::now()).await;
let sender = fixtures::login::create(&app, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
// Subscribe via the endpoint
@@ -21,10 +22,14 @@ async fn terminates_on_token_expiry() {
let subscriber =
fixtures::identity::logged_in(&app, &subscriber_creds, &fixtures::ancient()).await;
- let get::Response(events) =
- get::handler(State(app.clone()), subscriber, None, Query::default())
- .await
- .expect("subscribe never fails");
+ let get::Response(events) = get::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(get::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
// Verify the resulting stream's behaviour
@@ -56,6 +61,7 @@ async fn terminates_on_logout() {
let app = fixtures::scratch_app().await;
let channel = fixtures::channel::create(&app, &fixtures::now()).await;
let sender = fixtures::login::create(&app, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
// Subscribe via the endpoint
@@ -65,7 +71,7 @@ async fn terminates_on_logout() {
State(app.clone()),
subscriber.clone(),
None,
- Query::default(),
+ Query(get::QueryParams { resume_point }),
)
.await
.expect("subscribe never fails");
@@ -101,6 +107,7 @@ async fn terminates_on_password_change() {
let app = fixtures::scratch_app().await;
let channel = fixtures::channel::create(&app, &fixtures::now()).await;
let sender = fixtures::login::create(&app, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
// Subscribe via the endpoint
@@ -112,7 +119,7 @@ async fn terminates_on_password_change() {
State(app.clone()),
subscriber.clone(),
None,
- Query::default(),
+ Query(get::QueryParams { resume_point }),
)
.await
.expect("subscribe never fails");