diff options
27 files changed, 269 insertions, 155 deletions
diff --git a/.sqlx/query-a40496319887752f5845c0c2e965e2294c1a6932d72a8b04d258d06d0f8938bd.json b/.sqlx/query-093a57206253b4c05c2aa51fe70264af3f5b77c6dacd4a61be470f262d62b5d4.json index 436e1dd..1d8a2e1 100644 --- a/.sqlx/query-a40496319887752f5845c0c2e965e2294c1a6932d72a8b04d258d06d0f8938bd.json +++ b/.sqlx/query-093a57206253b4c05c2aa51fe70264af3f5b77c6dacd4a61be470f262d62b5d4.json @@ -1,6 +1,6 @@ { "db_name": "SQLite", - "query": "\n select\n id as \"id: Id\",\n name.display_name as \"display_name: String\",\n name.canonical_name as \"canonical_name: String\",\n channel.created_at as \"created_at: DateTime\",\n channel.created_sequence as \"created_sequence: Sequence\",\n deleted.deleted_at as \"deleted_at?: DateTime\",\n deleted.deleted_sequence as \"deleted_sequence?: Sequence\"\n from channel\n left join channel_name as name\n using (id)\n left join channel_deleted as deleted\n using (id)\n where coalesce(channel.created_sequence > $1, true)\n ", + "query": "\n select\n id as \"id: Id\",\n name.display_name as \"display_name?: String\",\n name.canonical_name as \"canonical_name?: String\",\n channel.created_at as \"created_at: DateTime\",\n channel.created_sequence as \"created_sequence: Sequence\",\n deleted.deleted_at as \"deleted_at?: DateTime\",\n deleted.deleted_sequence as \"deleted_sequence?: Sequence\"\n from channel\n left join channel_name as name\n using (id)\n left join channel_deleted as deleted\n using (id)\n where channel.created_sequence > $1\n or deleted.deleted_sequence > $1\n ", "describe": { "columns": [ { @@ -9,12 +9,12 @@ "type_info": "Text" }, { - "name": "display_name: String", + "name": "display_name?: String", "ordinal": 1, "type_info": "Null" }, { - "name": "canonical_name: String", + "name": "canonical_name?: String", "ordinal": 2, "type_info": "Null" }, @@ -52,5 +52,5 @@ true ] }, - "hash": "a40496319887752f5845c0c2e965e2294c1a6932d72a8b04d258d06d0f8938bd" + "hash": "093a57206253b4c05c2aa51fe70264af3f5b77c6dacd4a61be470f262d62b5d4" } diff --git a/.sqlx/query-ac7ab464e44e4412cd83744730699683e63ae1ea404418a17c6af60148f03fe8.json b/.sqlx/query-2c20c29d9adfed6201a6a69da95bc8271dfc8c6ec8ebf174ba8a57111b322291.json index 7d9bbbc..ae546ad 100644 --- a/.sqlx/query-ac7ab464e44e4412cd83744730699683e63ae1ea404418a17c6af60148f03fe8.json +++ b/.sqlx/query-2c20c29d9adfed6201a6a69da95bc8271dfc8c6ec8ebf174ba8a57111b322291.json @@ -1,6 +1,6 @@ { "db_name": "SQLite", - "query": "\n select\n id as \"id: Id\",\n display_name as \"display_name: String\",\n canonical_name as \"canonical_name: String\",\n created_sequence as \"created_sequence: Sequence\",\n created_at as \"created_at: DateTime\"\n from login\n where coalesce(login.created_sequence > $1, true)\n ", + "query": "\n select\n id as \"id: Id\",\n display_name as \"display_name: String\",\n canonical_name as \"canonical_name: String\",\n created_sequence as \"created_sequence: Sequence\",\n created_at as \"created_at: DateTime\"\n from login\n where login.created_sequence > $1\n ", "describe": { "columns": [ { @@ -40,5 +40,5 @@ false ] }, - "hash": "ac7ab464e44e4412cd83744730699683e63ae1ea404418a17c6af60148f03fe8" + "hash": "2c20c29d9adfed6201a6a69da95bc8271dfc8c6ec8ebf174ba8a57111b322291" } diff --git a/.sqlx/query-24bc0257eff3357322481e1314f70d13e8b0ca22b7652f1063ec7796cf307269.json b/.sqlx/query-9a748f34805bb316452cb527d5ba226bf52349548261393e00e74081cdbe041b.json index 9f09a28..5423bfd 100644 --- a/.sqlx/query-24bc0257eff3357322481e1314f70d13e8b0ca22b7652f1063ec7796cf307269.json +++ b/.sqlx/query-9a748f34805bb316452cb527d5ba226bf52349548261393e00e74081cdbe041b.json @@ -1,6 +1,6 @@ { "db_name": "SQLite", - "query": "\n select\n id as \"id: Id\",\n message.channel as \"channel: channel::Id\",\n message.sender as \"sender: login::Id\",\n message.sent_at as \"sent_at: DateTime\",\n message.sent_sequence as \"sent_sequence: Sequence\",\n message.body as \"body: Body\",\n deleted.deleted_at as \"deleted_at: DateTime\",\n deleted.deleted_sequence as \"deleted_sequence: Sequence\"\n from message\n left join message_deleted as deleted\n using (id)\n where coalesce(message.sent_sequence > $1, true)\n ", + "query": "\n select\n id as \"id: Id\",\n message.channel as \"channel: channel::Id\",\n message.sender as \"sender: login::Id\",\n message.sent_at as \"sent_at: DateTime\",\n message.sent_sequence as \"sent_sequence: Sequence\",\n message.body as \"body: Body\",\n deleted.deleted_at as \"deleted_at?: DateTime\",\n deleted.deleted_sequence as \"deleted_sequence?: Sequence\"\n from message\n left join message_deleted as deleted\n using (id)\n where message.sent_sequence > $1\n or deleted.deleted_sequence > $1\n ", "describe": { "columns": [ { @@ -34,12 +34,12 @@ "type_info": "Text" }, { - "name": "deleted_at: DateTime", + "name": "deleted_at?: DateTime", "ordinal": 6, "type_info": "Text" }, { - "name": "deleted_sequence: Sequence", + "name": "deleted_sequence?: Sequence", "ordinal": 7, "type_info": "Integer" } @@ -58,5 +58,5 @@ true ] }, - "hash": "24bc0257eff3357322481e1314f70d13e8b0ca22b7652f1063ec7796cf307269" + "hash": "9a748f34805bb316452cb527d5ba226bf52349548261393e00e74081cdbe041b" } diff --git a/.sqlx/query-9f611a3351f22ed16d67d41602b8befafe751e9caeb0d5f279731e04c6925f46.json b/.sqlx/query-cbf29fae3725bbb3d9e94d932ace995f53efd6c7800a7a1b42daec41d081b3d2.json index 2d1f49e..9c3c10e 100644 --- a/.sqlx/query-9f611a3351f22ed16d67d41602b8befafe751e9caeb0d5f279731e04c6925f46.json +++ b/.sqlx/query-cbf29fae3725bbb3d9e94d932ace995f53efd6c7800a7a1b42daec41d081b3d2.json @@ -1,6 +1,6 @@ { "db_name": "SQLite", - "query": "\n select\n id as \"id: Id\",\n display_name as \"display_name: String\",\n canonical_name as \"canonical_name: String\",\n created_sequence as \"created_sequence: Sequence\",\n created_at as \"created_at: DateTime\"\n from login\n where coalesce(created_sequence <= $1, true)\n order by canonical_name\n ", + "query": "\n select\n id as \"id: Id\",\n display_name as \"display_name: String\",\n canonical_name as \"canonical_name: String\",\n created_sequence as \"created_sequence: Sequence\",\n created_at as \"created_at: DateTime\"\n from login\n where created_sequence <= $1\n order by canonical_name\n ", "describe": { "columns": [ { @@ -40,5 +40,5 @@ false ] }, - "hash": "9f611a3351f22ed16d67d41602b8befafe751e9caeb0d5f279731e04c6925f46" + "hash": "cbf29fae3725bbb3d9e94d932ace995f53efd6c7800a7a1b42daec41d081b3d2" } diff --git a/.sqlx/query-fce8f4fbd59a8b3b8531e10599914331682d58ace57214bfa26ccaa089592a24.json b/.sqlx/query-ff61ff22108f1e98bbfc9a84a27bdaefca99706a0c73c17f19cc537f3f669882.json index 7aab764..f38f49c 100644 --- a/.sqlx/query-fce8f4fbd59a8b3b8531e10599914331682d58ace57214bfa26ccaa089592a24.json +++ b/.sqlx/query-ff61ff22108f1e98bbfc9a84a27bdaefca99706a0c73c17f19cc537f3f669882.json @@ -1,6 +1,6 @@ { "db_name": "SQLite", - "query": "\n select\n message.channel as \"channel: channel::Id\",\n message.sender as \"sender: login::Id\",\n id as \"id: Id\",\n message.body as \"body: Body\",\n message.sent_at as \"sent_at: DateTime\",\n message.sent_sequence as \"sent_sequence: Sequence\",\n deleted.deleted_at as \"deleted_at: DateTime\",\n deleted.deleted_sequence as \"deleted_sequence: Sequence\"\n from message\n left join message_deleted as deleted\n using (id)\n where coalesce(message.sent_sequence <= $2, true)\n order by message.sent_sequence\n ", + "query": "\n select\n message.channel as \"channel: channel::Id\",\n message.sender as \"sender: login::Id\",\n message.id as \"id: Id\",\n message.body as \"body: Body\",\n message.sent_at as \"sent_at: DateTime\",\n message.sent_sequence as \"sent_sequence: Sequence\",\n deleted.deleted_at as \"deleted_at?: DateTime\",\n deleted.deleted_sequence as \"deleted_sequence?: Sequence\"\n from message\n left join message_deleted as deleted\n using (id)\n where message.sent_sequence <= $1\n order by message.sent_sequence\n ", "describe": { "columns": [ { @@ -34,12 +34,12 @@ "type_info": "Integer" }, { - "name": "deleted_at: DateTime", + "name": "deleted_at?: DateTime", "ordinal": 6, "type_info": "Text" }, { - "name": "deleted_sequence: Sequence", + "name": "deleted_sequence?: Sequence", "ordinal": 7, "type_info": "Integer" } @@ -54,9 +54,9 @@ true, false, false, - true, - true + false, + false ] }, - "hash": "fce8f4fbd59a8b3b8531e10599914331682d58ace57214bfa26ccaa089592a24" + "hash": "ff61ff22108f1e98bbfc9a84a27bdaefca99706a0c73c17f19cc537f3f669882" } diff --git a/docs/api/events.md b/docs/api/events.md index b08e971..b23469c 100644 --- a/docs/api/events.md +++ b/docs/api/events.md @@ -46,7 +46,7 @@ This endpoint is designed for use with the [EventSource] DOM API, and supports s ### Query parameters -This endpoint accepts an optional `resume_point` (integer) query parameter. When provided, the event stream will include events published after that point in time; the value must be the value obtained by calling the [`GET /api/boot`](./boot.md) method. If absent, the returned event stream includes all events. +This endpoint requires a `resume_point` (integer) query parameter. The event stream will collect events published after that point in time. The value must be obtained by calling the [`GET /api/boot`](./boot.md) method. ### Request headers diff --git a/src/boot/app.rs b/src/boot/app.rs index e716b58..909f7d8 100644 --- a/src/boot/app.rs +++ b/src/boot/app.rs @@ -22,9 +22,9 @@ impl<'a> Boot<'a> { let mut tx = self.db.begin().await?; let resume_point = tx.sequence().current().await?; - let logins = tx.logins().all(resume_point.into()).await?; + let logins = tx.logins().all(resume_point).await?; let channels = tx.channels().all(resume_point).await?; - let messages = tx.messages().all(resume_point.into()).await?; + let messages = tx.messages().all(resume_point).await?; tx.commit().await?; diff --git a/src/channel/history.rs b/src/channel/history.rs index dda7bb9..ef2120d 100644 --- a/src/channel/history.rs +++ b/src/channel/history.rs @@ -4,7 +4,7 @@ use super::{ event::{Created, Deleted, Event}, Channel, Id, }; -use crate::event::{Instant, ResumePoint, Sequence}; +use crate::event::{Instant, Sequence}; #[derive(Clone, Debug, Eq, PartialEq)] pub struct History { @@ -28,9 +28,9 @@ impl History { self.channel.clone() } - pub fn as_of(&self, resume_point: impl Into<ResumePoint>) -> Option<Channel> { + pub fn as_of(&self, resume_point: Sequence) -> Option<Channel> { self.events() - .filter(Sequence::up_to(resume_point.into())) + .filter(Sequence::up_to(resume_point)) .collect() } diff --git a/src/channel/repo.rs b/src/channel/repo.rs index f47e564..7206c21 100644 --- a/src/channel/repo.rs +++ b/src/channel/repo.rs @@ -5,7 +5,7 @@ use crate::{ channel::{Channel, History, Id}, clock::DateTime, db::NotFound, - event::{Instant, ResumePoint, Sequence}, + event::{Instant, Sequence}, name::{self, Name}, }; @@ -144,13 +144,13 @@ impl<'c> Channels<'c> { Ok(channels) } - pub async fn replay(&mut self, resume_at: ResumePoint) -> Result<Vec<History>, LoadError> { + pub async fn replay(&mut self, resume_at: Sequence) -> Result<Vec<History>, LoadError> { let channels = sqlx::query!( r#" select id as "id: Id", - name.display_name as "display_name: String", - name.canonical_name as "canonical_name: String", + name.display_name as "display_name?: String", + name.canonical_name as "canonical_name?: String", channel.created_at as "created_at: DateTime", channel.created_sequence as "created_sequence: Sequence", deleted.deleted_at as "deleted_at?: DateTime", @@ -160,7 +160,8 @@ impl<'c> Channels<'c> { using (id) left join channel_deleted as deleted using (id) - where coalesce(channel.created_sequence > $1, true) + where channel.created_sequence > $1 + or deleted.deleted_sequence > $1 "#, resume_at, ) diff --git a/src/channel/routes/channel/test/post.rs b/src/channel/routes/channel/test/post.rs index 111a703..bc0684b 100644 --- a/src/channel/routes/channel/test/post.rs +++ b/src/channel/routes/channel/test/post.rs @@ -15,6 +15,7 @@ async fn messages_in_order() { let app = fixtures::scratch_app().await; let sender = fixtures::identity::create(&app, &fixtures::now()).await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Call the endpoint (twice) @@ -41,7 +42,7 @@ async fn messages_in_order() { let mut events = app .events() - .subscribe(None) + .subscribe(resume_point) .await .expect("subscribing to a valid channel succeeds") .filter_map(fixtures::event::message) diff --git a/src/channel/routes/test.rs b/src/channel/routes/test.rs index f5369fb..cba8f2e 100644 --- a/src/channel/routes/test.rs +++ b/src/channel/routes/test.rs @@ -16,6 +16,7 @@ async fn new_channel() { let app = fixtures::scratch_app().await; let creator = fixtures::identity::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Call the endpoint @@ -44,7 +45,7 @@ async fn new_channel() { let mut events = app .events() - .subscribe(None) + .subscribe(resume_point) .await .expect("subscribing never fails") .filter_map(fixtures::event::channel) diff --git a/src/event/app.rs b/src/event/app.rs index c754388..b309245 100644 --- a/src/event/app.rs +++ b/src/event/app.rs @@ -6,7 +6,7 @@ use futures::{ use itertools::Itertools as _; use sqlx::sqlite::SqlitePool; -use super::{broadcaster::Broadcaster, Event, ResumePoint, Sequence, Sequenced}; +use super::{broadcaster::Broadcaster, Event, Sequence, Sequenced}; use crate::{ channel::{self, repo::Provider as _}, login::{self, repo::Provider as _}, @@ -26,9 +26,8 @@ impl<'a> Events<'a> { pub async fn subscribe( &self, - resume_at: impl Into<ResumePoint>, + resume_at: Sequence, ) -> Result<impl Stream<Item = Event> + std::fmt::Debug, Error> { - let resume_at = resume_at.into(); // Subscribe before retrieving, to catch messages broadcast while we're // querying the DB. We'll prune out duplicates later. let live_messages = self.events.subscribe(); @@ -63,7 +62,7 @@ impl<'a> Events<'a> { .merge_by(channel_events, Sequence::merge) .merge_by(message_events, Sequence::merge) .collect::<Vec<_>>(); - let resume_live_at = replay_events.last().map(Sequenced::sequence); + let resume_live_at = replay_events.last().map_or(resume_at, Sequenced::sequence); let replay = stream::iter(replay_events); @@ -77,7 +76,7 @@ impl<'a> Events<'a> { Ok(replay.chain(live_messages)) } - fn resume(resume_at: ResumePoint) -> impl for<'m> FnMut(&'m Event) -> future::Ready<bool> { + fn resume(resume_at: Sequence) -> impl for<'m> FnMut(&'m Event) -> future::Ready<bool> { let filter = Sequence::after(resume_at); move |event| future::ready(filter(event)) } diff --git a/src/event/mod.rs b/src/event/mod.rs index 69c7a10..9996916 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -13,8 +13,6 @@ pub use self::{ sequence::{Instant, Sequence, Sequenced}, }; -pub type ResumePoint = Option<Sequence>; - #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] #[serde(tag = "type", rename_all = "snake_case")] pub enum Event { diff --git a/src/event/routes/get.rs b/src/event/routes/get.rs index 22e8762..ceebcc9 100644 --- a/src/event/routes/get.rs +++ b/src/event/routes/get.rs @@ -12,7 +12,7 @@ use futures::stream::{Stream, StreamExt as _}; use crate::{ app::App, error::{Internal, Unauthorized}, - event::{app, extract::LastEventId, Event, ResumePoint, Sequence, Sequenced as _}, + event::{app, extract::LastEventId, Event, Sequence, Sequenced as _}, token::{app::ValidateError, extract::Identity}, }; @@ -22,9 +22,7 @@ pub async fn handler( last_event_id: Option<LastEventId<Sequence>>, Query(query): Query<QueryParams>, ) -> Result<Response<impl Stream<Item = Event> + std::fmt::Debug>, Error> { - let resume_at = last_event_id - .map(LastEventId::into_inner) - .or(query.resume_point); + let resume_at = last_event_id.map_or(query.resume_point, LastEventId::into_inner); let stream = app.events().subscribe(resume_at).await?; let stream = app.tokens().limit_stream(identity.token, stream).await?; @@ -32,9 +30,9 @@ pub async fn handler( Ok(Response(stream)) } -#[derive(Default, serde::Deserialize)] +#[derive(serde::Deserialize)] pub struct QueryParams { - pub resume_point: ResumePoint, + pub resume_point: Sequence, } #[derive(Debug)] diff --git a/src/event/routes/test/channel.rs b/src/event/routes/test/channel.rs index 6a0a803..0695ab1 100644 --- a/src/event/routes/test/channel.rs +++ b/src/event/routes/test/channel.rs @@ -12,14 +12,19 @@ async fn creating() { // Set up the environment let app = fixtures::scratch_app().await; + let resume_point = fixtures::boot::resume_point(&app).await; // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Create a channel @@ -46,6 +51,7 @@ async fn previously_created() { // Set up the environment let app = fixtures::scratch_app().await; + let resume_point = fixtures::boot::resume_point(&app).await; // Create a channel @@ -59,10 +65,14 @@ async fn previously_created() { // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Verify channel created event @@ -81,14 +91,19 @@ async fn expiring() { let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Expire channels @@ -113,6 +128,7 @@ async fn previously_expired() { let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Expire channels @@ -124,10 +140,14 @@ async fn previously_expired() { // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Check for expiry event let _ = events @@ -145,14 +165,19 @@ async fn deleting() { let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Delete the channel @@ -177,6 +202,7 @@ async fn previously_deleted() { let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Delete the channel @@ -188,10 +214,14 @@ async fn previously_deleted() { // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Check for expiry event let _ = events @@ -209,6 +239,7 @@ async fn previously_purged() { let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Delete and purge the channel @@ -225,10 +256,14 @@ async fn previously_purged() { // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Check for expiry event events diff --git a/src/event/routes/test/invite.rs b/src/event/routes/test/invite.rs index d24f474..73af62d 100644 --- a/src/event/routes/test/invite.rs +++ b/src/event/routes/test/invite.rs @@ -14,14 +14,19 @@ async fn accepting_invite() { let app = fixtures::scratch_app().await; let issuer = fixtures::login::create(&app, &fixtures::now()).await; let invite = fixtures::invite::issue(&app, &issuer, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Accept the invite @@ -50,6 +55,7 @@ async fn previously_accepted_invite() { let app = fixtures::scratch_app().await; let issuer = fixtures::login::create(&app, &fixtures::now()).await; let invite = fixtures::invite::issue(&app, &issuer, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Accept the invite @@ -63,10 +69,14 @@ async fn previously_accepted_invite() { // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Expect a login created event diff --git a/src/event/routes/test/message.rs b/src/event/routes/test/message.rs index a7b25fb..fafaeb3 100644 --- a/src/event/routes/test/message.rs +++ b/src/event/routes/test/message.rs @@ -16,14 +16,19 @@ async fn sending() { let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Call the endpoint let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Send a message @@ -56,6 +61,7 @@ async fn previously_sent() { let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Send a message @@ -74,10 +80,14 @@ async fn previously_sent() { // Call the endpoint let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Verify that an event is delivered @@ -96,6 +106,7 @@ async fn sent_in_multiple_channels() { let app = fixtures::scratch_app().await; let sender = fixtures::login::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; let channels = [ fixtures::channel::create(&app, &fixtures::now()).await, @@ -115,9 +126,14 @@ async fn sent_in_multiple_channels() { // Call the endpoint let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = get::handler(State(app), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Verify the structure of the response. @@ -141,6 +157,7 @@ async fn sent_sequentially() { let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; let sender = fixtures::login::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; let messages = vec![ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, @@ -151,9 +168,14 @@ async fn sent_sequentially() { // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = get::handler(State(app), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Verify the expected events in the expected order @@ -180,14 +202,19 @@ async fn expiring() { let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; let sender = fixtures::login::create(&app, &fixtures::ancient()).await; let message = fixtures::message::send(&app, &channel, &sender, &fixtures::ancient()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Expire messages @@ -214,6 +241,7 @@ async fn previously_expired() { let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; let sender = fixtures::login::create(&app, &fixtures::ancient()).await; let message = fixtures::message::send(&app, &channel, &sender, &fixtures::ancient()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Expire messages @@ -225,10 +253,14 @@ async fn previously_expired() { // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Check for expiry event let _ = events @@ -248,14 +280,19 @@ async fn deleting() { let channel = fixtures::channel::create(&app, &fixtures::now()).await; let sender = fixtures::login::create(&app, &fixtures::now()).await; let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Delete the message @@ -282,6 +319,7 @@ async fn previously_deleted() { let channel = fixtures::channel::create(&app, &fixtures::now()).await; let sender = fixtures::login::create(&app, &fixtures::now()).await; let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Delete the message @@ -293,10 +331,14 @@ async fn previously_deleted() { // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Check for delete event let _ = events @@ -316,6 +358,7 @@ async fn previously_purged() { let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; let sender = fixtures::login::create(&app, &fixtures::ancient()).await; let message = fixtures::message::send(&app, &channel, &sender, &fixtures::ancient()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Purge the message @@ -332,10 +375,14 @@ async fn previously_purged() { // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Check for delete event diff --git a/src/event/routes/test/resume.rs b/src/event/routes/test/resume.rs index 62b9bad..fabda0c 100644 --- a/src/event/routes/test/resume.rs +++ b/src/event/routes/test/resume.rs @@ -16,6 +16,7 @@ async fn resume() { let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; let sender = fixtures::login::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; let initial_message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; @@ -34,7 +35,7 @@ async fn resume() { State(app.clone()), subscriber.clone(), None, - Query::default(), + Query(get::QueryParams { resume_point }), ) .await .expect("subscribe never fails"); @@ -55,7 +56,7 @@ async fn resume() { State(app), subscriber, Some(resume_at.into()), - Query::default(), + Query(get::QueryParams { resume_point }), ) .await .expect("subscribe never fails"); @@ -98,6 +99,7 @@ async fn serial_resume() { let sender = fixtures::login::create(&app, &fixtures::now()).await; let channel_a = fixtures::channel::create(&app, &fixtures::now()).await; let channel_b = fixtures::channel::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Call the endpoint @@ -115,7 +117,7 @@ async fn serial_resume() { State(app.clone()), subscriber.clone(), None, - Query::default(), + Query(get::QueryParams { resume_point }), ) .await .expect("subscribe never fails"); @@ -156,7 +158,7 @@ async fn serial_resume() { State(app.clone()), subscriber.clone(), Some(resume_at.into()), - Query::default(), + Query(get::QueryParams { resume_point }), ) .await .expect("subscribe never fails"); @@ -197,7 +199,7 @@ async fn serial_resume() { State(app.clone()), subscriber.clone(), Some(resume_at.into()), - Query::default(), + Query(get::QueryParams { resume_point }), ) .await .expect("subscribe never fails"); diff --git a/src/event/routes/test/setup.rs b/src/event/routes/test/setup.rs index 007b03d..26b7ea7 100644 --- a/src/event/routes/test/setup.rs +++ b/src/event/routes/test/setup.rs @@ -15,6 +15,7 @@ async fn previously_completed() { // Set up the environment let app = fixtures::scratch_app().await; + let resume_point = fixtures::boot::resume_point(&app).await; // Complete initial setup @@ -28,10 +29,14 @@ async fn previously_completed() { // Subscribe to events let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Expect a login created event diff --git a/src/event/routes/test/token.rs b/src/event/routes/test/token.rs index 16ac7c3..fa76865 100644 --- a/src/event/routes/test/token.rs +++ b/src/event/routes/test/token.rs @@ -14,6 +14,7 @@ async fn terminates_on_token_expiry() { let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; let sender = fixtures::login::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Subscribe via the endpoint @@ -21,10 +22,14 @@ async fn terminates_on_token_expiry() { let subscriber = fixtures::identity::logged_in(&app, &subscriber_creds, &fixtures::ancient()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Verify the resulting stream's behaviour @@ -56,6 +61,7 @@ async fn terminates_on_logout() { let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; let sender = fixtures::login::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Subscribe via the endpoint @@ -65,7 +71,7 @@ async fn terminates_on_logout() { State(app.clone()), subscriber.clone(), None, - Query::default(), + Query(get::QueryParams { resume_point }), ) .await .expect("subscribe never fails"); @@ -101,6 +107,7 @@ async fn terminates_on_password_change() { let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; let sender = fixtures::login::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Subscribe via the endpoint @@ -112,7 +119,7 @@ async fn terminates_on_password_change() { State(app.clone()), subscriber.clone(), None, - Query::default(), + Query(get::QueryParams { resume_point }), ) .await .expect("subscribe never fails"); diff --git a/src/event/sequence.rs b/src/event/sequence.rs index 9bc399b..77281c2 100644 --- a/src/event/sequence.rs +++ b/src/event/sequence.rs @@ -1,6 +1,5 @@ use std::fmt; -use super::ResumePoint; use crate::clock::DateTime; #[derive(Clone, Copy, Debug, Eq, PartialEq, serde::Serialize)] @@ -51,18 +50,18 @@ impl fmt::Display for Sequence { } impl Sequence { - pub fn up_to<E>(resume_point: ResumePoint) -> impl for<'e> Fn(&'e E) -> bool + pub fn up_to<E>(resume_point: Sequence) -> impl for<'e> Fn(&'e E) -> bool where E: Sequenced, { - move |event| resume_point.map_or(true, |resume_point| event.sequence() <= resume_point) + move |event| event.sequence() <= resume_point } - pub fn after<E>(resume_point: ResumePoint) -> impl for<'e> Fn(&'e E) -> bool + pub fn after<E>(resume_point: Sequence) -> impl for<'e> Fn(&'e E) -> bool where E: Sequenced, { - move |event| resume_point < Some(event.sequence()) + move |event| resume_point < event.sequence() } pub fn start_from<E>(resume_point: Self) -> impl for<'e> Fn(&'e E) -> bool diff --git a/src/login/history.rs b/src/login/history.rs index daad579..8161b0b 100644 --- a/src/login/history.rs +++ b/src/login/history.rs @@ -2,7 +2,7 @@ use super::{ event::{Created, Event}, Id, Login, }; -use crate::event::{Instant, ResumePoint, Sequence}; +use crate::event::{Instant, Sequence}; #[derive(Clone, Debug, Eq, PartialEq)] pub struct History { @@ -24,9 +24,9 @@ impl History { self.login.clone() } - pub fn as_of(&self, resume_point: impl Into<ResumePoint>) -> Option<Login> { + pub fn as_of(&self, resume_point: Sequence) -> Option<Login> { self.events() - .filter(Sequence::up_to(resume_point.into())) + .filter(Sequence::up_to(resume_point)) .collect() } diff --git a/src/login/repo.rs b/src/login/repo.rs index 9439a25..1c63a4b 100644 --- a/src/login/repo.rs +++ b/src/login/repo.rs @@ -3,7 +3,7 @@ use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; use crate::{ clock::DateTime, - event::{Instant, ResumePoint, Sequence}, + event::{Instant, Sequence}, login::{password::StoredHash, History, Id, Login}, name::{self, Name}, }; @@ -81,7 +81,7 @@ impl<'c> Logins<'c> { Ok(()) } - pub async fn all(&mut self, resume_at: ResumePoint) -> Result<Vec<History>, LoadError> { + pub async fn all(&mut self, resume_at: Sequence) -> Result<Vec<History>, LoadError> { let logins = sqlx::query!( r#" select @@ -91,7 +91,7 @@ impl<'c> Logins<'c> { created_sequence as "created_sequence: Sequence", created_at as "created_at: DateTime" from login - where coalesce(created_sequence <= $1, true) + where created_sequence <= $1 order by canonical_name "#, resume_at, @@ -113,7 +113,7 @@ impl<'c> Logins<'c> { Ok(logins) } - pub async fn replay(&mut self, resume_at: ResumePoint) -> Result<Vec<History>, LoadError> { + pub async fn replay(&mut self, resume_at: Sequence) -> Result<Vec<History>, LoadError> { let logins = sqlx::query!( r#" select @@ -123,7 +123,7 @@ impl<'c> Logins<'c> { created_sequence as "created_sequence: Sequence", created_at as "created_at: DateTime" from login - where coalesce(login.created_sequence > $1, true) + where login.created_sequence > $1 "#, resume_at, ) diff --git a/src/message/history.rs b/src/message/history.rs index 67e437a..ed8f5df 100644 --- a/src/message/history.rs +++ b/src/message/history.rs @@ -4,7 +4,7 @@ use super::{ event::{Deleted, Event, Sent}, Id, Message, }; -use crate::event::{Instant, ResumePoint, Sequence}; +use crate::event::{Instant, Sequence}; #[derive(Clone, Debug, Eq, PartialEq)] pub struct History { @@ -27,9 +27,9 @@ impl History { self.message.clone() } - pub fn as_of(&self, resume_point: impl Into<ResumePoint>) -> Option<Message> { + pub fn as_of(&self, resume_point: Sequence) -> Option<Message> { self.events() - .filter(Sequence::up_to(resume_point.into())) + .filter(Sequence::up_to(resume_point)) .collect() } diff --git a/src/message/repo.rs b/src/message/repo.rs index c8ceceb..913135c 100644 --- a/src/message/repo.rs +++ b/src/message/repo.rs @@ -4,7 +4,7 @@ use super::{snapshot::Message, Body, History, Id}; use crate::{ channel, clock::DateTime, - event::{Instant, ResumePoint, Sequence}, + event::{Instant, Sequence}, login::{self, Login}, }; @@ -106,22 +106,22 @@ impl<'c> Messages<'c> { Ok(messages) } - pub async fn all(&mut self, resume_at: ResumePoint) -> Result<Vec<History>, sqlx::Error> { + pub async fn all(&mut self, resume_at: Sequence) -> Result<Vec<History>, sqlx::Error> { let messages = sqlx::query!( r#" select message.channel as "channel: channel::Id", message.sender as "sender: login::Id", - id as "id: Id", + message.id as "id: Id", message.body as "body: Body", message.sent_at as "sent_at: DateTime", message.sent_sequence as "sent_sequence: Sequence", - deleted.deleted_at as "deleted_at: DateTime", - deleted.deleted_sequence as "deleted_sequence: Sequence" + deleted.deleted_at as "deleted_at?: DateTime", + deleted.deleted_sequence as "deleted_sequence?: Sequence" from message left join message_deleted as deleted using (id) - where coalesce(message.sent_sequence <= $2, true) + where message.sent_sequence <= $1 order by message.sent_sequence "#, resume_at, @@ -282,7 +282,7 @@ impl<'c> Messages<'c> { Ok(messages) } - pub async fn replay(&mut self, resume_at: ResumePoint) -> Result<Vec<History>, sqlx::Error> { + pub async fn replay(&mut self, resume_at: Sequence) -> Result<Vec<History>, sqlx::Error> { let messages = sqlx::query!( r#" select @@ -292,12 +292,13 @@ impl<'c> Messages<'c> { message.sent_at as "sent_at: DateTime", message.sent_sequence as "sent_sequence: Sequence", message.body as "body: Body", - deleted.deleted_at as "deleted_at: DateTime", - deleted.deleted_sequence as "deleted_sequence: Sequence" + deleted.deleted_at as "deleted_at?: DateTime", + deleted.deleted_sequence as "deleted_sequence?: Sequence" from message left join message_deleted as deleted using (id) - where coalesce(message.sent_sequence > $1, true) + where message.sent_sequence > $1 + or deleted.deleted_sequence > $1 "#, resume_at, ) diff --git a/src/test/fixtures/boot.rs b/src/test/fixtures/boot.rs new file mode 100644 index 0000000..120726f --- /dev/null +++ b/src/test/fixtures/boot.rs @@ -0,0 +1,9 @@ +use crate::{app::App, event::Sequence}; + +pub async fn resume_point(app: &App) -> Sequence { + app.boot() + .snapshot() + .await + .expect("boot always succeeds") + .resume_point +} diff --git a/src/test/fixtures/mod.rs b/src/test/fixtures/mod.rs index 2b7b6af..470b31a 100644 --- a/src/test/fixtures/mod.rs +++ b/src/test/fixtures/mod.rs @@ -2,6 +2,7 @@ use chrono::{TimeDelta, Utc}; use crate::{app::App, clock::RequestedAt, db}; +pub mod boot; pub mod channel; pub mod cookie; pub mod event; |
