From 70591c5ac10069a4ae649bd6f79d769da9e32a98 Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Wed, 30 Oct 2024 01:25:04 -0400 Subject: Remove `hi-recanonicalize`. This utility was needed to support a database migration with existing data. I have it on good authority that no further databases exist that are in the state that made this tool necessary. --- src/channel/repo.rs | 32 -------------------------------- 1 file changed, 32 deletions(-) (limited to 'src/channel/repo.rs') diff --git a/src/channel/repo.rs b/src/channel/repo.rs index a49db52..f47e564 100644 --- a/src/channel/repo.rs +++ b/src/channel/repo.rs @@ -300,38 +300,6 @@ impl<'c> Channels<'c> { Ok(channels) } - - pub async fn recanonicalize(&mut self) -> Result<(), sqlx::Error> { - let channels = sqlx::query!( - r#" - select - id as "id: Id", - display_name as "display_name: String" - from channel_name - "#, - ) - .fetch_all(&mut *self.0) - .await?; - - for channel in channels { - let name = Name::from(channel.display_name); - let canonical_name = name.canonical(); - - sqlx::query!( - r#" - update channel_name - set canonical_name = $1 - where id = $2 - "#, - canonical_name, - channel.id, - ) - .execute(&mut *self.0) - .await?; - } - - Ok(()) - } } #[derive(Debug, thiserror::Error)] -- cgit v1.2.3 From 50a382528288248381b07c25719cbc9a519b4c81 Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Wed, 30 Oct 2024 02:01:31 -0400 Subject: Resume points are no longer optional. This is an inconsequential change for actual clients, since "resume from the beginning" was never a preferred mode of operation, and it simplifies some internals. It should also mean we get better query plans where `coalesce(cond, true)` was previously being used. --- ...1fe70264af3f5b77c6dacd4a61be470f262d62b5d4.json | 56 ++++++++++ ...1314f70d13e8b0ca22b7652f1063ec7796cf307269.json | 62 ----------- ...9da95bc8271dfc8c6ec8ebf174ba8a57111b322291.json | 44 ++++++++ ...27d5ba226bf52349548261393e00e74081cdbe041b.json | 62 +++++++++++ ...1602b8befafe751e9caeb0d5f279731e04c6925f46.json | 44 -------- ...c2e965e2294c1a6932d72a8b04d258d06d0f8938bd.json | 56 ---------- ...4730699683e63ae1ea404418a17c6af60148f03fe8.json | 44 -------- ...932ace995f53efd6c7800a7a1b42daec41d081b3d2.json | 44 ++++++++ ...0599914331682d58ace57214bfa26ccaa089592a24.json | 62 ----------- ...84a27bdaefca99706a0c73c17f19cc537f3f669882.json | 62 +++++++++++ docs/api/events.md | 2 +- src/boot/app.rs | 4 +- src/channel/history.rs | 6 +- src/channel/repo.rs | 11 +- src/channel/routes/channel/test/post.rs | 3 +- src/channel/routes/test.rs | 3 +- src/event/app.rs | 9 +- src/event/mod.rs | 2 - src/event/routes/get.rs | 10 +- src/event/routes/test/channel.rs | 91 +++++++++++----- src/event/routes/test/invite.rs | 26 +++-- src/event/routes/test/message.rs | 115 +++++++++++++++------ src/event/routes/test/resume.rs | 12 ++- src/event/routes/test/setup.rs | 13 ++- src/event/routes/test/token.rs | 19 ++-- src/event/sequence.rs | 9 +- src/login/history.rs | 6 +- src/login/repo.rs | 10 +- src/message/history.rs | 6 +- src/message/repo.rs | 21 ++-- src/test/fixtures/boot.rs | 9 ++ src/test/fixtures/mod.rs | 1 + 32 files changed, 519 insertions(+), 405 deletions(-) create mode 100644 .sqlx/query-093a57206253b4c05c2aa51fe70264af3f5b77c6dacd4a61be470f262d62b5d4.json delete mode 100644 .sqlx/query-24bc0257eff3357322481e1314f70d13e8b0ca22b7652f1063ec7796cf307269.json create mode 100644 .sqlx/query-2c20c29d9adfed6201a6a69da95bc8271dfc8c6ec8ebf174ba8a57111b322291.json create mode 100644 .sqlx/query-9a748f34805bb316452cb527d5ba226bf52349548261393e00e74081cdbe041b.json delete mode 100644 .sqlx/query-9f611a3351f22ed16d67d41602b8befafe751e9caeb0d5f279731e04c6925f46.json delete mode 100644 .sqlx/query-a40496319887752f5845c0c2e965e2294c1a6932d72a8b04d258d06d0f8938bd.json delete mode 100644 .sqlx/query-ac7ab464e44e4412cd83744730699683e63ae1ea404418a17c6af60148f03fe8.json create mode 100644 .sqlx/query-cbf29fae3725bbb3d9e94d932ace995f53efd6c7800a7a1b42daec41d081b3d2.json delete mode 100644 .sqlx/query-fce8f4fbd59a8b3b8531e10599914331682d58ace57214bfa26ccaa089592a24.json create mode 100644 .sqlx/query-ff61ff22108f1e98bbfc9a84a27bdaefca99706a0c73c17f19cc537f3f669882.json create mode 100644 src/test/fixtures/boot.rs (limited to 'src/channel/repo.rs') diff --git a/.sqlx/query-093a57206253b4c05c2aa51fe70264af3f5b77c6dacd4a61be470f262d62b5d4.json b/.sqlx/query-093a57206253b4c05c2aa51fe70264af3f5b77c6dacd4a61be470f262d62b5d4.json new file mode 100644 index 0000000..1d8a2e1 --- /dev/null +++ b/.sqlx/query-093a57206253b4c05c2aa51fe70264af3f5b77c6dacd4a61be470f262d62b5d4.json @@ -0,0 +1,56 @@ +{ + "db_name": "SQLite", + "query": "\n select\n id as \"id: Id\",\n name.display_name as \"display_name?: String\",\n name.canonical_name as \"canonical_name?: String\",\n channel.created_at as \"created_at: DateTime\",\n channel.created_sequence as \"created_sequence: Sequence\",\n deleted.deleted_at as \"deleted_at?: DateTime\",\n deleted.deleted_sequence as \"deleted_sequence?: Sequence\"\n from channel\n left join channel_name as name\n using (id)\n left join channel_deleted as deleted\n using (id)\n where channel.created_sequence > $1\n or deleted.deleted_sequence > $1\n ", + "describe": { + "columns": [ + { + "name": "id: Id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "display_name?: String", + "ordinal": 1, + "type_info": "Null" + }, + { + "name": "canonical_name?: String", + "ordinal": 2, + "type_info": "Null" + }, + { + "name": "created_at: DateTime", + "ordinal": 3, + "type_info": "Text" + }, + { + "name": "created_sequence: Sequence", + "ordinal": 4, + "type_info": "Integer" + }, + { + "name": "deleted_at?: DateTime", + "ordinal": 5, + "type_info": "Text" + }, + { + "name": "deleted_sequence?: Sequence", + "ordinal": 6, + "type_info": "Integer" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false, + true, + true, + false, + false, + true, + true + ] + }, + "hash": "093a57206253b4c05c2aa51fe70264af3f5b77c6dacd4a61be470f262d62b5d4" +} diff --git a/.sqlx/query-24bc0257eff3357322481e1314f70d13e8b0ca22b7652f1063ec7796cf307269.json b/.sqlx/query-24bc0257eff3357322481e1314f70d13e8b0ca22b7652f1063ec7796cf307269.json deleted file mode 100644 index 9f09a28..0000000 --- a/.sqlx/query-24bc0257eff3357322481e1314f70d13e8b0ca22b7652f1063ec7796cf307269.json +++ /dev/null @@ -1,62 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n select\n id as \"id: Id\",\n message.channel as \"channel: channel::Id\",\n message.sender as \"sender: login::Id\",\n message.sent_at as \"sent_at: DateTime\",\n message.sent_sequence as \"sent_sequence: Sequence\",\n message.body as \"body: Body\",\n deleted.deleted_at as \"deleted_at: DateTime\",\n deleted.deleted_sequence as \"deleted_sequence: Sequence\"\n from message\n left join message_deleted as deleted\n using (id)\n where coalesce(message.sent_sequence > $1, true)\n ", - "describe": { - "columns": [ - { - "name": "id: Id", - "ordinal": 0, - "type_info": "Text" - }, - { - "name": "channel: channel::Id", - "ordinal": 1, - "type_info": "Text" - }, - { - "name": "sender: login::Id", - "ordinal": 2, - "type_info": "Text" - }, - { - "name": "sent_at: DateTime", - "ordinal": 3, - "type_info": "Text" - }, - { - "name": "sent_sequence: Sequence", - "ordinal": 4, - "type_info": "Integer" - }, - { - "name": "body: Body", - "ordinal": 5, - "type_info": "Text" - }, - { - "name": "deleted_at: DateTime", - "ordinal": 6, - "type_info": "Text" - }, - { - "name": "deleted_sequence: Sequence", - "ordinal": 7, - "type_info": "Integer" - } - ], - "parameters": { - "Right": 1 - }, - "nullable": [ - false, - false, - false, - false, - false, - true, - true, - true - ] - }, - "hash": "24bc0257eff3357322481e1314f70d13e8b0ca22b7652f1063ec7796cf307269" -} diff --git a/.sqlx/query-2c20c29d9adfed6201a6a69da95bc8271dfc8c6ec8ebf174ba8a57111b322291.json b/.sqlx/query-2c20c29d9adfed6201a6a69da95bc8271dfc8c6ec8ebf174ba8a57111b322291.json new file mode 100644 index 0000000..ae546ad --- /dev/null +++ b/.sqlx/query-2c20c29d9adfed6201a6a69da95bc8271dfc8c6ec8ebf174ba8a57111b322291.json @@ -0,0 +1,44 @@ +{ + "db_name": "SQLite", + "query": "\n select\n id as \"id: Id\",\n display_name as \"display_name: String\",\n canonical_name as \"canonical_name: String\",\n created_sequence as \"created_sequence: Sequence\",\n created_at as \"created_at: DateTime\"\n from login\n where login.created_sequence > $1\n ", + "describe": { + "columns": [ + { + "name": "id: Id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "display_name: String", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "canonical_name: String", + "ordinal": 2, + "type_info": "Text" + }, + { + "name": "created_sequence: Sequence", + "ordinal": 3, + "type_info": "Integer" + }, + { + "name": "created_at: DateTime", + "ordinal": 4, + "type_info": "Text" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false, + false, + false, + false, + false + ] + }, + "hash": "2c20c29d9adfed6201a6a69da95bc8271dfc8c6ec8ebf174ba8a57111b322291" +} diff --git a/.sqlx/query-9a748f34805bb316452cb527d5ba226bf52349548261393e00e74081cdbe041b.json b/.sqlx/query-9a748f34805bb316452cb527d5ba226bf52349548261393e00e74081cdbe041b.json new file mode 100644 index 0000000..5423bfd --- /dev/null +++ b/.sqlx/query-9a748f34805bb316452cb527d5ba226bf52349548261393e00e74081cdbe041b.json @@ -0,0 +1,62 @@ +{ + "db_name": "SQLite", + "query": "\n select\n id as \"id: Id\",\n message.channel as \"channel: channel::Id\",\n message.sender as \"sender: login::Id\",\n message.sent_at as \"sent_at: DateTime\",\n message.sent_sequence as \"sent_sequence: Sequence\",\n message.body as \"body: Body\",\n deleted.deleted_at as \"deleted_at?: DateTime\",\n deleted.deleted_sequence as \"deleted_sequence?: Sequence\"\n from message\n left join message_deleted as deleted\n using (id)\n where message.sent_sequence > $1\n or deleted.deleted_sequence > $1\n ", + "describe": { + "columns": [ + { + "name": "id: Id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "channel: channel::Id", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "sender: login::Id", + "ordinal": 2, + "type_info": "Text" + }, + { + "name": "sent_at: DateTime", + "ordinal": 3, + "type_info": "Text" + }, + { + "name": "sent_sequence: Sequence", + "ordinal": 4, + "type_info": "Integer" + }, + { + "name": "body: Body", + "ordinal": 5, + "type_info": "Text" + }, + { + "name": "deleted_at?: DateTime", + "ordinal": 6, + "type_info": "Text" + }, + { + "name": "deleted_sequence?: Sequence", + "ordinal": 7, + "type_info": "Integer" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false, + false, + false, + false, + false, + true, + true, + true + ] + }, + "hash": "9a748f34805bb316452cb527d5ba226bf52349548261393e00e74081cdbe041b" +} diff --git a/.sqlx/query-9f611a3351f22ed16d67d41602b8befafe751e9caeb0d5f279731e04c6925f46.json b/.sqlx/query-9f611a3351f22ed16d67d41602b8befafe751e9caeb0d5f279731e04c6925f46.json deleted file mode 100644 index 2d1f49e..0000000 --- a/.sqlx/query-9f611a3351f22ed16d67d41602b8befafe751e9caeb0d5f279731e04c6925f46.json +++ /dev/null @@ -1,44 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n select\n id as \"id: Id\",\n display_name as \"display_name: String\",\n canonical_name as \"canonical_name: String\",\n created_sequence as \"created_sequence: Sequence\",\n created_at as \"created_at: DateTime\"\n from login\n where coalesce(created_sequence <= $1, true)\n order by canonical_name\n ", - "describe": { - "columns": [ - { - "name": "id: Id", - "ordinal": 0, - "type_info": "Text" - }, - { - "name": "display_name: String", - "ordinal": 1, - "type_info": "Text" - }, - { - "name": "canonical_name: String", - "ordinal": 2, - "type_info": "Text" - }, - { - "name": "created_sequence: Sequence", - "ordinal": 3, - "type_info": "Integer" - }, - { - "name": "created_at: DateTime", - "ordinal": 4, - "type_info": "Text" - } - ], - "parameters": { - "Right": 1 - }, - "nullable": [ - false, - false, - false, - false, - false - ] - }, - "hash": "9f611a3351f22ed16d67d41602b8befafe751e9caeb0d5f279731e04c6925f46" -} diff --git a/.sqlx/query-a40496319887752f5845c0c2e965e2294c1a6932d72a8b04d258d06d0f8938bd.json b/.sqlx/query-a40496319887752f5845c0c2e965e2294c1a6932d72a8b04d258d06d0f8938bd.json deleted file mode 100644 index 436e1dd..0000000 --- a/.sqlx/query-a40496319887752f5845c0c2e965e2294c1a6932d72a8b04d258d06d0f8938bd.json +++ /dev/null @@ -1,56 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n select\n id as \"id: Id\",\n name.display_name as \"display_name: String\",\n name.canonical_name as \"canonical_name: String\",\n channel.created_at as \"created_at: DateTime\",\n channel.created_sequence as \"created_sequence: Sequence\",\n deleted.deleted_at as \"deleted_at?: DateTime\",\n deleted.deleted_sequence as \"deleted_sequence?: Sequence\"\n from channel\n left join channel_name as name\n using (id)\n left join channel_deleted as deleted\n using (id)\n where coalesce(channel.created_sequence > $1, true)\n ", - "describe": { - "columns": [ - { - "name": "id: Id", - "ordinal": 0, - "type_info": "Text" - }, - { - "name": "display_name: String", - "ordinal": 1, - "type_info": "Null" - }, - { - "name": "canonical_name: String", - "ordinal": 2, - "type_info": "Null" - }, - { - "name": "created_at: DateTime", - "ordinal": 3, - "type_info": "Text" - }, - { - "name": "created_sequence: Sequence", - "ordinal": 4, - "type_info": "Integer" - }, - { - "name": "deleted_at?: DateTime", - "ordinal": 5, - "type_info": "Text" - }, - { - "name": "deleted_sequence?: Sequence", - "ordinal": 6, - "type_info": "Integer" - } - ], - "parameters": { - "Right": 1 - }, - "nullable": [ - false, - true, - true, - false, - false, - true, - true - ] - }, - "hash": "a40496319887752f5845c0c2e965e2294c1a6932d72a8b04d258d06d0f8938bd" -} diff --git a/.sqlx/query-ac7ab464e44e4412cd83744730699683e63ae1ea404418a17c6af60148f03fe8.json b/.sqlx/query-ac7ab464e44e4412cd83744730699683e63ae1ea404418a17c6af60148f03fe8.json deleted file mode 100644 index 7d9bbbc..0000000 --- a/.sqlx/query-ac7ab464e44e4412cd83744730699683e63ae1ea404418a17c6af60148f03fe8.json +++ /dev/null @@ -1,44 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n select\n id as \"id: Id\",\n display_name as \"display_name: String\",\n canonical_name as \"canonical_name: String\",\n created_sequence as \"created_sequence: Sequence\",\n created_at as \"created_at: DateTime\"\n from login\n where coalesce(login.created_sequence > $1, true)\n ", - "describe": { - "columns": [ - { - "name": "id: Id", - "ordinal": 0, - "type_info": "Text" - }, - { - "name": "display_name: String", - "ordinal": 1, - "type_info": "Text" - }, - { - "name": "canonical_name: String", - "ordinal": 2, - "type_info": "Text" - }, - { - "name": "created_sequence: Sequence", - "ordinal": 3, - "type_info": "Integer" - }, - { - "name": "created_at: DateTime", - "ordinal": 4, - "type_info": "Text" - } - ], - "parameters": { - "Right": 1 - }, - "nullable": [ - false, - false, - false, - false, - false - ] - }, - "hash": "ac7ab464e44e4412cd83744730699683e63ae1ea404418a17c6af60148f03fe8" -} diff --git a/.sqlx/query-cbf29fae3725bbb3d9e94d932ace995f53efd6c7800a7a1b42daec41d081b3d2.json b/.sqlx/query-cbf29fae3725bbb3d9e94d932ace995f53efd6c7800a7a1b42daec41d081b3d2.json new file mode 100644 index 0000000..9c3c10e --- /dev/null +++ b/.sqlx/query-cbf29fae3725bbb3d9e94d932ace995f53efd6c7800a7a1b42daec41d081b3d2.json @@ -0,0 +1,44 @@ +{ + "db_name": "SQLite", + "query": "\n select\n id as \"id: Id\",\n display_name as \"display_name: String\",\n canonical_name as \"canonical_name: String\",\n created_sequence as \"created_sequence: Sequence\",\n created_at as \"created_at: DateTime\"\n from login\n where created_sequence <= $1\n order by canonical_name\n ", + "describe": { + "columns": [ + { + "name": "id: Id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "display_name: String", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "canonical_name: String", + "ordinal": 2, + "type_info": "Text" + }, + { + "name": "created_sequence: Sequence", + "ordinal": 3, + "type_info": "Integer" + }, + { + "name": "created_at: DateTime", + "ordinal": 4, + "type_info": "Text" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false, + false, + false, + false, + false + ] + }, + "hash": "cbf29fae3725bbb3d9e94d932ace995f53efd6c7800a7a1b42daec41d081b3d2" +} diff --git a/.sqlx/query-fce8f4fbd59a8b3b8531e10599914331682d58ace57214bfa26ccaa089592a24.json b/.sqlx/query-fce8f4fbd59a8b3b8531e10599914331682d58ace57214bfa26ccaa089592a24.json deleted file mode 100644 index 7aab764..0000000 --- a/.sqlx/query-fce8f4fbd59a8b3b8531e10599914331682d58ace57214bfa26ccaa089592a24.json +++ /dev/null @@ -1,62 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n select\n message.channel as \"channel: channel::Id\",\n message.sender as \"sender: login::Id\",\n id as \"id: Id\",\n message.body as \"body: Body\",\n message.sent_at as \"sent_at: DateTime\",\n message.sent_sequence as \"sent_sequence: Sequence\",\n deleted.deleted_at as \"deleted_at: DateTime\",\n deleted.deleted_sequence as \"deleted_sequence: Sequence\"\n from message\n left join message_deleted as deleted\n using (id)\n where coalesce(message.sent_sequence <= $2, true)\n order by message.sent_sequence\n ", - "describe": { - "columns": [ - { - "name": "channel: channel::Id", - "ordinal": 0, - "type_info": "Text" - }, - { - "name": "sender: login::Id", - "ordinal": 1, - "type_info": "Text" - }, - { - "name": "id: Id", - "ordinal": 2, - "type_info": "Text" - }, - { - "name": "body: Body", - "ordinal": 3, - "type_info": "Text" - }, - { - "name": "sent_at: DateTime", - "ordinal": 4, - "type_info": "Text" - }, - { - "name": "sent_sequence: Sequence", - "ordinal": 5, - "type_info": "Integer" - }, - { - "name": "deleted_at: DateTime", - "ordinal": 6, - "type_info": "Text" - }, - { - "name": "deleted_sequence: Sequence", - "ordinal": 7, - "type_info": "Integer" - } - ], - "parameters": { - "Right": 1 - }, - "nullable": [ - false, - false, - false, - true, - false, - false, - true, - true - ] - }, - "hash": "fce8f4fbd59a8b3b8531e10599914331682d58ace57214bfa26ccaa089592a24" -} diff --git a/.sqlx/query-ff61ff22108f1e98bbfc9a84a27bdaefca99706a0c73c17f19cc537f3f669882.json b/.sqlx/query-ff61ff22108f1e98bbfc9a84a27bdaefca99706a0c73c17f19cc537f3f669882.json new file mode 100644 index 0000000..f38f49c --- /dev/null +++ b/.sqlx/query-ff61ff22108f1e98bbfc9a84a27bdaefca99706a0c73c17f19cc537f3f669882.json @@ -0,0 +1,62 @@ +{ + "db_name": "SQLite", + "query": "\n select\n message.channel as \"channel: channel::Id\",\n message.sender as \"sender: login::Id\",\n message.id as \"id: Id\",\n message.body as \"body: Body\",\n message.sent_at as \"sent_at: DateTime\",\n message.sent_sequence as \"sent_sequence: Sequence\",\n deleted.deleted_at as \"deleted_at?: DateTime\",\n deleted.deleted_sequence as \"deleted_sequence?: Sequence\"\n from message\n left join message_deleted as deleted\n using (id)\n where message.sent_sequence <= $1\n order by message.sent_sequence\n ", + "describe": { + "columns": [ + { + "name": "channel: channel::Id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "sender: login::Id", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "id: Id", + "ordinal": 2, + "type_info": "Text" + }, + { + "name": "body: Body", + "ordinal": 3, + "type_info": "Text" + }, + { + "name": "sent_at: DateTime", + "ordinal": 4, + "type_info": "Text" + }, + { + "name": "sent_sequence: Sequence", + "ordinal": 5, + "type_info": "Integer" + }, + { + "name": "deleted_at?: DateTime", + "ordinal": 6, + "type_info": "Text" + }, + { + "name": "deleted_sequence?: Sequence", + "ordinal": 7, + "type_info": "Integer" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false, + false, + false, + true, + false, + false, + false, + false + ] + }, + "hash": "ff61ff22108f1e98bbfc9a84a27bdaefca99706a0c73c17f19cc537f3f669882" +} diff --git a/docs/api/events.md b/docs/api/events.md index b08e971..b23469c 100644 --- a/docs/api/events.md +++ b/docs/api/events.md @@ -46,7 +46,7 @@ This endpoint is designed for use with the [EventSource] DOM API, and supports s ### Query parameters -This endpoint accepts an optional `resume_point` (integer) query parameter. When provided, the event stream will include events published after that point in time; the value must be the value obtained by calling the [`GET /api/boot`](./boot.md) method. If absent, the returned event stream includes all events. +This endpoint requires a `resume_point` (integer) query parameter. The event stream will collect events published after that point in time. The value must be obtained by calling the [`GET /api/boot`](./boot.md) method. ### Request headers diff --git a/src/boot/app.rs b/src/boot/app.rs index e716b58..909f7d8 100644 --- a/src/boot/app.rs +++ b/src/boot/app.rs @@ -22,9 +22,9 @@ impl<'a> Boot<'a> { let mut tx = self.db.begin().await?; let resume_point = tx.sequence().current().await?; - let logins = tx.logins().all(resume_point.into()).await?; + let logins = tx.logins().all(resume_point).await?; let channels = tx.channels().all(resume_point).await?; - let messages = tx.messages().all(resume_point.into()).await?; + let messages = tx.messages().all(resume_point).await?; tx.commit().await?; diff --git a/src/channel/history.rs b/src/channel/history.rs index dda7bb9..ef2120d 100644 --- a/src/channel/history.rs +++ b/src/channel/history.rs @@ -4,7 +4,7 @@ use super::{ event::{Created, Deleted, Event}, Channel, Id, }; -use crate::event::{Instant, ResumePoint, Sequence}; +use crate::event::{Instant, Sequence}; #[derive(Clone, Debug, Eq, PartialEq)] pub struct History { @@ -28,9 +28,9 @@ impl History { self.channel.clone() } - pub fn as_of(&self, resume_point: impl Into) -> Option { + pub fn as_of(&self, resume_point: Sequence) -> Option { self.events() - .filter(Sequence::up_to(resume_point.into())) + .filter(Sequence::up_to(resume_point)) .collect() } diff --git a/src/channel/repo.rs b/src/channel/repo.rs index f47e564..7206c21 100644 --- a/src/channel/repo.rs +++ b/src/channel/repo.rs @@ -5,7 +5,7 @@ use crate::{ channel::{Channel, History, Id}, clock::DateTime, db::NotFound, - event::{Instant, ResumePoint, Sequence}, + event::{Instant, Sequence}, name::{self, Name}, }; @@ -144,13 +144,13 @@ impl<'c> Channels<'c> { Ok(channels) } - pub async fn replay(&mut self, resume_at: ResumePoint) -> Result, LoadError> { + pub async fn replay(&mut self, resume_at: Sequence) -> Result, LoadError> { let channels = sqlx::query!( r#" select id as "id: Id", - name.display_name as "display_name: String", - name.canonical_name as "canonical_name: String", + name.display_name as "display_name?: String", + name.canonical_name as "canonical_name?: String", channel.created_at as "created_at: DateTime", channel.created_sequence as "created_sequence: Sequence", deleted.deleted_at as "deleted_at?: DateTime", @@ -160,7 +160,8 @@ impl<'c> Channels<'c> { using (id) left join channel_deleted as deleted using (id) - where coalesce(channel.created_sequence > $1, true) + where channel.created_sequence > $1 + or deleted.deleted_sequence > $1 "#, resume_at, ) diff --git a/src/channel/routes/channel/test/post.rs b/src/channel/routes/channel/test/post.rs index 111a703..bc0684b 100644 --- a/src/channel/routes/channel/test/post.rs +++ b/src/channel/routes/channel/test/post.rs @@ -15,6 +15,7 @@ async fn messages_in_order() { let app = fixtures::scratch_app().await; let sender = fixtures::identity::create(&app, &fixtures::now()).await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Call the endpoint (twice) @@ -41,7 +42,7 @@ async fn messages_in_order() { let mut events = app .events() - .subscribe(None) + .subscribe(resume_point) .await .expect("subscribing to a valid channel succeeds") .filter_map(fixtures::event::message) diff --git a/src/channel/routes/test.rs b/src/channel/routes/test.rs index f5369fb..cba8f2e 100644 --- a/src/channel/routes/test.rs +++ b/src/channel/routes/test.rs @@ -16,6 +16,7 @@ async fn new_channel() { let app = fixtures::scratch_app().await; let creator = fixtures::identity::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Call the endpoint @@ -44,7 +45,7 @@ async fn new_channel() { let mut events = app .events() - .subscribe(None) + .subscribe(resume_point) .await .expect("subscribing never fails") .filter_map(fixtures::event::channel) diff --git a/src/event/app.rs b/src/event/app.rs index c754388..b309245 100644 --- a/src/event/app.rs +++ b/src/event/app.rs @@ -6,7 +6,7 @@ use futures::{ use itertools::Itertools as _; use sqlx::sqlite::SqlitePool; -use super::{broadcaster::Broadcaster, Event, ResumePoint, Sequence, Sequenced}; +use super::{broadcaster::Broadcaster, Event, Sequence, Sequenced}; use crate::{ channel::{self, repo::Provider as _}, login::{self, repo::Provider as _}, @@ -26,9 +26,8 @@ impl<'a> Events<'a> { pub async fn subscribe( &self, - resume_at: impl Into, + resume_at: Sequence, ) -> Result + std::fmt::Debug, Error> { - let resume_at = resume_at.into(); // Subscribe before retrieving, to catch messages broadcast while we're // querying the DB. We'll prune out duplicates later. let live_messages = self.events.subscribe(); @@ -63,7 +62,7 @@ impl<'a> Events<'a> { .merge_by(channel_events, Sequence::merge) .merge_by(message_events, Sequence::merge) .collect::>(); - let resume_live_at = replay_events.last().map(Sequenced::sequence); + let resume_live_at = replay_events.last().map_or(resume_at, Sequenced::sequence); let replay = stream::iter(replay_events); @@ -77,7 +76,7 @@ impl<'a> Events<'a> { Ok(replay.chain(live_messages)) } - fn resume(resume_at: ResumePoint) -> impl for<'m> FnMut(&'m Event) -> future::Ready { + fn resume(resume_at: Sequence) -> impl for<'m> FnMut(&'m Event) -> future::Ready { let filter = Sequence::after(resume_at); move |event| future::ready(filter(event)) } diff --git a/src/event/mod.rs b/src/event/mod.rs index 69c7a10..9996916 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -13,8 +13,6 @@ pub use self::{ sequence::{Instant, Sequence, Sequenced}, }; -pub type ResumePoint = Option; - #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] #[serde(tag = "type", rename_all = "snake_case")] pub enum Event { diff --git a/src/event/routes/get.rs b/src/event/routes/get.rs index 22e8762..ceebcc9 100644 --- a/src/event/routes/get.rs +++ b/src/event/routes/get.rs @@ -12,7 +12,7 @@ use futures::stream::{Stream, StreamExt as _}; use crate::{ app::App, error::{Internal, Unauthorized}, - event::{app, extract::LastEventId, Event, ResumePoint, Sequence, Sequenced as _}, + event::{app, extract::LastEventId, Event, Sequence, Sequenced as _}, token::{app::ValidateError, extract::Identity}, }; @@ -22,9 +22,7 @@ pub async fn handler( last_event_id: Option>, Query(query): Query, ) -> Result + std::fmt::Debug>, Error> { - let resume_at = last_event_id - .map(LastEventId::into_inner) - .or(query.resume_point); + let resume_at = last_event_id.map_or(query.resume_point, LastEventId::into_inner); let stream = app.events().subscribe(resume_at).await?; let stream = app.tokens().limit_stream(identity.token, stream).await?; @@ -32,9 +30,9 @@ pub async fn handler( Ok(Response(stream)) } -#[derive(Default, serde::Deserialize)] +#[derive(serde::Deserialize)] pub struct QueryParams { - pub resume_point: ResumePoint, + pub resume_point: Sequence, } #[derive(Debug)] diff --git a/src/event/routes/test/channel.rs b/src/event/routes/test/channel.rs index 6a0a803..0695ab1 100644 --- a/src/event/routes/test/channel.rs +++ b/src/event/routes/test/channel.rs @@ -12,14 +12,19 @@ async fn creating() { // Set up the environment let app = fixtures::scratch_app().await; + let resume_point = fixtures::boot::resume_point(&app).await; // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Create a channel @@ -46,6 +51,7 @@ async fn previously_created() { // Set up the environment let app = fixtures::scratch_app().await; + let resume_point = fixtures::boot::resume_point(&app).await; // Create a channel @@ -59,10 +65,14 @@ async fn previously_created() { // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Verify channel created event @@ -81,14 +91,19 @@ async fn expiring() { let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Expire channels @@ -113,6 +128,7 @@ async fn previously_expired() { let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Expire channels @@ -124,10 +140,14 @@ async fn previously_expired() { // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Check for expiry event let _ = events @@ -145,14 +165,19 @@ async fn deleting() { let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Delete the channel @@ -177,6 +202,7 @@ async fn previously_deleted() { let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Delete the channel @@ -188,10 +214,14 @@ async fn previously_deleted() { // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Check for expiry event let _ = events @@ -209,6 +239,7 @@ async fn previously_purged() { let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Delete and purge the channel @@ -225,10 +256,14 @@ async fn previously_purged() { // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Check for expiry event events diff --git a/src/event/routes/test/invite.rs b/src/event/routes/test/invite.rs index d24f474..73af62d 100644 --- a/src/event/routes/test/invite.rs +++ b/src/event/routes/test/invite.rs @@ -14,14 +14,19 @@ async fn accepting_invite() { let app = fixtures::scratch_app().await; let issuer = fixtures::login::create(&app, &fixtures::now()).await; let invite = fixtures::invite::issue(&app, &issuer, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Accept the invite @@ -50,6 +55,7 @@ async fn previously_accepted_invite() { let app = fixtures::scratch_app().await; let issuer = fixtures::login::create(&app, &fixtures::now()).await; let invite = fixtures::invite::issue(&app, &issuer, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Accept the invite @@ -63,10 +69,14 @@ async fn previously_accepted_invite() { // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Expect a login created event diff --git a/src/event/routes/test/message.rs b/src/event/routes/test/message.rs index a7b25fb..fafaeb3 100644 --- a/src/event/routes/test/message.rs +++ b/src/event/routes/test/message.rs @@ -16,14 +16,19 @@ async fn sending() { let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Call the endpoint let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Send a message @@ -56,6 +61,7 @@ async fn previously_sent() { let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Send a message @@ -74,10 +80,14 @@ async fn previously_sent() { // Call the endpoint let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Verify that an event is delivered @@ -96,6 +106,7 @@ async fn sent_in_multiple_channels() { let app = fixtures::scratch_app().await; let sender = fixtures::login::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; let channels = [ fixtures::channel::create(&app, &fixtures::now()).await, @@ -115,9 +126,14 @@ async fn sent_in_multiple_channels() { // Call the endpoint let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = get::handler(State(app), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Verify the structure of the response. @@ -141,6 +157,7 @@ async fn sent_sequentially() { let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; let sender = fixtures::login::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; let messages = vec![ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, @@ -151,9 +168,14 @@ async fn sent_sequentially() { // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = get::handler(State(app), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Verify the expected events in the expected order @@ -180,14 +202,19 @@ async fn expiring() { let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; let sender = fixtures::login::create(&app, &fixtures::ancient()).await; let message = fixtures::message::send(&app, &channel, &sender, &fixtures::ancient()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Expire messages @@ -214,6 +241,7 @@ async fn previously_expired() { let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; let sender = fixtures::login::create(&app, &fixtures::ancient()).await; let message = fixtures::message::send(&app, &channel, &sender, &fixtures::ancient()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Expire messages @@ -225,10 +253,14 @@ async fn previously_expired() { // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Check for expiry event let _ = events @@ -248,14 +280,19 @@ async fn deleting() { let channel = fixtures::channel::create(&app, &fixtures::now()).await; let sender = fixtures::login::create(&app, &fixtures::now()).await; let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Delete the message @@ -282,6 +319,7 @@ async fn previously_deleted() { let channel = fixtures::channel::create(&app, &fixtures::now()).await; let sender = fixtures::login::create(&app, &fixtures::now()).await; let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Delete the message @@ -293,10 +331,14 @@ async fn previously_deleted() { // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Check for delete event let _ = events @@ -316,6 +358,7 @@ async fn previously_purged() { let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; let sender = fixtures::login::create(&app, &fixtures::ancient()).await; let message = fixtures::message::send(&app, &channel, &sender, &fixtures::ancient()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Purge the message @@ -332,10 +375,14 @@ async fn previously_purged() { // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Check for delete event diff --git a/src/event/routes/test/resume.rs b/src/event/routes/test/resume.rs index 62b9bad..fabda0c 100644 --- a/src/event/routes/test/resume.rs +++ b/src/event/routes/test/resume.rs @@ -16,6 +16,7 @@ async fn resume() { let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; let sender = fixtures::login::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; let initial_message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; @@ -34,7 +35,7 @@ async fn resume() { State(app.clone()), subscriber.clone(), None, - Query::default(), + Query(get::QueryParams { resume_point }), ) .await .expect("subscribe never fails"); @@ -55,7 +56,7 @@ async fn resume() { State(app), subscriber, Some(resume_at.into()), - Query::default(), + Query(get::QueryParams { resume_point }), ) .await .expect("subscribe never fails"); @@ -98,6 +99,7 @@ async fn serial_resume() { let sender = fixtures::login::create(&app, &fixtures::now()).await; let channel_a = fixtures::channel::create(&app, &fixtures::now()).await; let channel_b = fixtures::channel::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Call the endpoint @@ -115,7 +117,7 @@ async fn serial_resume() { State(app.clone()), subscriber.clone(), None, - Query::default(), + Query(get::QueryParams { resume_point }), ) .await .expect("subscribe never fails"); @@ -156,7 +158,7 @@ async fn serial_resume() { State(app.clone()), subscriber.clone(), Some(resume_at.into()), - Query::default(), + Query(get::QueryParams { resume_point }), ) .await .expect("subscribe never fails"); @@ -197,7 +199,7 @@ async fn serial_resume() { State(app.clone()), subscriber.clone(), Some(resume_at.into()), - Query::default(), + Query(get::QueryParams { resume_point }), ) .await .expect("subscribe never fails"); diff --git a/src/event/routes/test/setup.rs b/src/event/routes/test/setup.rs index 007b03d..26b7ea7 100644 --- a/src/event/routes/test/setup.rs +++ b/src/event/routes/test/setup.rs @@ -15,6 +15,7 @@ async fn previously_completed() { // Set up the environment let app = fixtures::scratch_app().await; + let resume_point = fixtures::boot::resume_point(&app).await; // Complete initial setup @@ -28,10 +29,14 @@ async fn previously_completed() { // Subscribe to events let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Expect a login created event diff --git a/src/event/routes/test/token.rs b/src/event/routes/test/token.rs index 16ac7c3..fa76865 100644 --- a/src/event/routes/test/token.rs +++ b/src/event/routes/test/token.rs @@ -14,6 +14,7 @@ async fn terminates_on_token_expiry() { let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; let sender = fixtures::login::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Subscribe via the endpoint @@ -21,10 +22,14 @@ async fn terminates_on_token_expiry() { let subscriber = fixtures::identity::logged_in(&app, &subscriber_creds, &fixtures::ancient()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Verify the resulting stream's behaviour @@ -56,6 +61,7 @@ async fn terminates_on_logout() { let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; let sender = fixtures::login::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Subscribe via the endpoint @@ -65,7 +71,7 @@ async fn terminates_on_logout() { State(app.clone()), subscriber.clone(), None, - Query::default(), + Query(get::QueryParams { resume_point }), ) .await .expect("subscribe never fails"); @@ -101,6 +107,7 @@ async fn terminates_on_password_change() { let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; let sender = fixtures::login::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Subscribe via the endpoint @@ -112,7 +119,7 @@ async fn terminates_on_password_change() { State(app.clone()), subscriber.clone(), None, - Query::default(), + Query(get::QueryParams { resume_point }), ) .await .expect("subscribe never fails"); diff --git a/src/event/sequence.rs b/src/event/sequence.rs index 9bc399b..77281c2 100644 --- a/src/event/sequence.rs +++ b/src/event/sequence.rs @@ -1,6 +1,5 @@ use std::fmt; -use super::ResumePoint; use crate::clock::DateTime; #[derive(Clone, Copy, Debug, Eq, PartialEq, serde::Serialize)] @@ -51,18 +50,18 @@ impl fmt::Display for Sequence { } impl Sequence { - pub fn up_to(resume_point: ResumePoint) -> impl for<'e> Fn(&'e E) -> bool + pub fn up_to(resume_point: Sequence) -> impl for<'e> Fn(&'e E) -> bool where E: Sequenced, { - move |event| resume_point.map_or(true, |resume_point| event.sequence() <= resume_point) + move |event| event.sequence() <= resume_point } - pub fn after(resume_point: ResumePoint) -> impl for<'e> Fn(&'e E) -> bool + pub fn after(resume_point: Sequence) -> impl for<'e> Fn(&'e E) -> bool where E: Sequenced, { - move |event| resume_point < Some(event.sequence()) + move |event| resume_point < event.sequence() } pub fn start_from(resume_point: Self) -> impl for<'e> Fn(&'e E) -> bool diff --git a/src/login/history.rs b/src/login/history.rs index daad579..8161b0b 100644 --- a/src/login/history.rs +++ b/src/login/history.rs @@ -2,7 +2,7 @@ use super::{ event::{Created, Event}, Id, Login, }; -use crate::event::{Instant, ResumePoint, Sequence}; +use crate::event::{Instant, Sequence}; #[derive(Clone, Debug, Eq, PartialEq)] pub struct History { @@ -24,9 +24,9 @@ impl History { self.login.clone() } - pub fn as_of(&self, resume_point: impl Into) -> Option { + pub fn as_of(&self, resume_point: Sequence) -> Option { self.events() - .filter(Sequence::up_to(resume_point.into())) + .filter(Sequence::up_to(resume_point)) .collect() } diff --git a/src/login/repo.rs b/src/login/repo.rs index 9439a25..1c63a4b 100644 --- a/src/login/repo.rs +++ b/src/login/repo.rs @@ -3,7 +3,7 @@ use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; use crate::{ clock::DateTime, - event::{Instant, ResumePoint, Sequence}, + event::{Instant, Sequence}, login::{password::StoredHash, History, Id, Login}, name::{self, Name}, }; @@ -81,7 +81,7 @@ impl<'c> Logins<'c> { Ok(()) } - pub async fn all(&mut self, resume_at: ResumePoint) -> Result, LoadError> { + pub async fn all(&mut self, resume_at: Sequence) -> Result, LoadError> { let logins = sqlx::query!( r#" select @@ -91,7 +91,7 @@ impl<'c> Logins<'c> { created_sequence as "created_sequence: Sequence", created_at as "created_at: DateTime" from login - where coalesce(created_sequence <= $1, true) + where created_sequence <= $1 order by canonical_name "#, resume_at, @@ -113,7 +113,7 @@ impl<'c> Logins<'c> { Ok(logins) } - pub async fn replay(&mut self, resume_at: ResumePoint) -> Result, LoadError> { + pub async fn replay(&mut self, resume_at: Sequence) -> Result, LoadError> { let logins = sqlx::query!( r#" select @@ -123,7 +123,7 @@ impl<'c> Logins<'c> { created_sequence as "created_sequence: Sequence", created_at as "created_at: DateTime" from login - where coalesce(login.created_sequence > $1, true) + where login.created_sequence > $1 "#, resume_at, ) diff --git a/src/message/history.rs b/src/message/history.rs index 67e437a..ed8f5df 100644 --- a/src/message/history.rs +++ b/src/message/history.rs @@ -4,7 +4,7 @@ use super::{ event::{Deleted, Event, Sent}, Id, Message, }; -use crate::event::{Instant, ResumePoint, Sequence}; +use crate::event::{Instant, Sequence}; #[derive(Clone, Debug, Eq, PartialEq)] pub struct History { @@ -27,9 +27,9 @@ impl History { self.message.clone() } - pub fn as_of(&self, resume_point: impl Into) -> Option { + pub fn as_of(&self, resume_point: Sequence) -> Option { self.events() - .filter(Sequence::up_to(resume_point.into())) + .filter(Sequence::up_to(resume_point)) .collect() } diff --git a/src/message/repo.rs b/src/message/repo.rs index c8ceceb..913135c 100644 --- a/src/message/repo.rs +++ b/src/message/repo.rs @@ -4,7 +4,7 @@ use super::{snapshot::Message, Body, History, Id}; use crate::{ channel, clock::DateTime, - event::{Instant, ResumePoint, Sequence}, + event::{Instant, Sequence}, login::{self, Login}, }; @@ -106,22 +106,22 @@ impl<'c> Messages<'c> { Ok(messages) } - pub async fn all(&mut self, resume_at: ResumePoint) -> Result, sqlx::Error> { + pub async fn all(&mut self, resume_at: Sequence) -> Result, sqlx::Error> { let messages = sqlx::query!( r#" select message.channel as "channel: channel::Id", message.sender as "sender: login::Id", - id as "id: Id", + message.id as "id: Id", message.body as "body: Body", message.sent_at as "sent_at: DateTime", message.sent_sequence as "sent_sequence: Sequence", - deleted.deleted_at as "deleted_at: DateTime", - deleted.deleted_sequence as "deleted_sequence: Sequence" + deleted.deleted_at as "deleted_at?: DateTime", + deleted.deleted_sequence as "deleted_sequence?: Sequence" from message left join message_deleted as deleted using (id) - where coalesce(message.sent_sequence <= $2, true) + where message.sent_sequence <= $1 order by message.sent_sequence "#, resume_at, @@ -282,7 +282,7 @@ impl<'c> Messages<'c> { Ok(messages) } - pub async fn replay(&mut self, resume_at: ResumePoint) -> Result, sqlx::Error> { + pub async fn replay(&mut self, resume_at: Sequence) -> Result, sqlx::Error> { let messages = sqlx::query!( r#" select @@ -292,12 +292,13 @@ impl<'c> Messages<'c> { message.sent_at as "sent_at: DateTime", message.sent_sequence as "sent_sequence: Sequence", message.body as "body: Body", - deleted.deleted_at as "deleted_at: DateTime", - deleted.deleted_sequence as "deleted_sequence: Sequence" + deleted.deleted_at as "deleted_at?: DateTime", + deleted.deleted_sequence as "deleted_sequence?: Sequence" from message left join message_deleted as deleted using (id) - where coalesce(message.sent_sequence > $1, true) + where message.sent_sequence > $1 + or deleted.deleted_sequence > $1 "#, resume_at, ) diff --git a/src/test/fixtures/boot.rs b/src/test/fixtures/boot.rs new file mode 100644 index 0000000..120726f --- /dev/null +++ b/src/test/fixtures/boot.rs @@ -0,0 +1,9 @@ +use crate::{app::App, event::Sequence}; + +pub async fn resume_point(app: &App) -> Sequence { + app.boot() + .snapshot() + .await + .expect("boot always succeeds") + .resume_point +} diff --git a/src/test/fixtures/mod.rs b/src/test/fixtures/mod.rs index 2b7b6af..470b31a 100644 --- a/src/test/fixtures/mod.rs +++ b/src/test/fixtures/mod.rs @@ -2,6 +2,7 @@ use chrono::{TimeDelta, Utc}; use crate::{app::App, clock::RequestedAt, db}; +pub mod boot; pub mod channel; pub mod cookie; pub mod event; -- cgit v1.2.3 From 06c839436900ce07ec5c53175b01f3c5011e507c Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Wed, 30 Oct 2024 11:32:52 -0400 Subject: Track an index-friendly sequence range for both channels and messages. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This is meant to limit the amount of messages that event replay needs to examine. Previously, the query required a table scan; table scans on `message` can be quite large, and should really be avoided. The new schema allows replays to be carried out using an index scan. The premise of this change is that, for each (channel, message), there is a range of event sequence numbers that the (channel, message) may appear in. We'll notate that as `[start, end]` in the general case, but they are: * for active channels, `[ch.created_sequence, ch.created_sequence]`. * for deleted channels, `[ch.created_sequence, ch_del.deleted_sequence]`. * for active messages, `[mg.sent_sequence, mg.sent_sequence]`. * for deleted messages, `[mg.sent_seqeunce, mg_del.deleted_sequence]`. (The two "active" ranges may grow in future releases, to support things like channel renames and message editing. That won't change the logic, but those two things will need to update the new `last_sequence` field.) There are two families of operations that need to retrieve based on these ranges: * Boot creates a snapshot as of a specific `resume_at` sequence number, and thus must include any record whose `start` falls on or before `resume_at`. We can't exclude records whose `end` is also before it, as their terminal state may be one that is included in boot (eg. active channels). * Event replay needs to include any events that fall after the same `resume_at`, and thus must include events from any record whose `end` falls strictly after `resume_at`. We can't exclude records whose `start` is also strictly after `resume_at`, as we'd omit them from replay, inappropriately, if we did. This gives three interesting cases: 1. Record fully after `resume_at`: event sequence --increasing--> x-a … x … x+k … resume_at start end This record should be omitted by boot, but included for event replay. 2. Record fully before `resume_at`: event sequence --increasing--> x … x+k … x+a start end resume_at This record should be omitted for event replay, but included for boot. 3. Record overlapping `resume_at`: event sequence --increasing--> x … x+a … x+k start resume_at end This record needs to be included for both boot and event replay. However, the bounds of that range were previously stored in two tables (`channel` and `channel_deleted`, or `message` and `message_deleted`, respectively), which sqlite (indeed most SQL implementations) cannot index. This forced a table scan, leading to the program considering every possible (channel, message) during event replay. This commit adds a `last_sequence` field to channels and messages, which is set to the above values as channels and messages are operated on. This field is indexed, and queries can use it to rapidly identify relevant rows for event replay, cutting down the amount of reading needed to generate events on resume. --- ...1fe70264af3f5b77c6dacd4a61be470f262d62b5d4.json | 56 -------- ...32389259b91bbffa182e32b224635031eead2fa82d.json | 50 -------- ...ae5559dda60c4b2a9c6746376a3552ce73b7d8ea38.json | 20 +++ ...9075e71bd7e30dc93d32e1f273c878f18f2984860f.json | 62 +++++++++ ...3a7c0c7e8cf92e5ee6e3900fe9f94c75b5f8c9bfc4.json | 20 +++ ...41e4322f8026f9e2515b6bacaed81f6248c52a198a.json | 50 ++++++++ ...27d5ba226bf52349548261393e00e74081cdbe041b.json | 62 --------- ...e4a2679aaa4c3f28aa5436a9af067a754e46af5589.json | 56 ++++++++ ...ad72529de7030a4863d3bf479bb19a6a2a76d1590b.json | 12 ++ ...ae5ca13b72436960622863420d3f1d73a422fe5b42.json | 12 -- ...50121451dbcf01e8bec8a987b58c699b27b5d737af.json | 12 -- ...0241030152013_channel_message_last_event_id.sql | 141 +++++++++++++++++++++ src/channel/repo.rs | 21 ++- src/message/repo.rs | 16 ++- 14 files changed, 387 insertions(+), 203 deletions(-) delete mode 100644 .sqlx/query-093a57206253b4c05c2aa51fe70264af3f5b77c6dacd4a61be470f262d62b5d4.json delete mode 100644 .sqlx/query-0f0e4a6ac32b39f3bd7f4832389259b91bbffa182e32b224635031eead2fa82d.json create mode 100644 .sqlx/query-4e39f27605dec811824fddae5559dda60c4b2a9c6746376a3552ce73b7d8ea38.json create mode 100644 .sqlx/query-53b1f14d450a99f486bfd79075e71bd7e30dc93d32e1f273c878f18f2984860f.json create mode 100644 .sqlx/query-64fb9bad4505c144578e393a7c0c7e8cf92e5ee6e3900fe9f94c75b5f8c9bfc4.json create mode 100644 .sqlx/query-72441293731853e9f0cc1141e4322f8026f9e2515b6bacaed81f6248c52a198a.json delete mode 100644 .sqlx/query-9a748f34805bb316452cb527d5ba226bf52349548261393e00e74081cdbe041b.json create mode 100644 .sqlx/query-c44dbcc7f4c0257a991e1ae4a2679aaa4c3f28aa5436a9af067a754e46af5589.json create mode 100644 .sqlx/query-ca9146e92c3b3e724f4b58ad72529de7030a4863d3bf479bb19a6a2a76d1590b.json delete mode 100644 .sqlx/query-d1c869c323d1ab45216279ae5ca13b72436960622863420d3f1d73a422fe5b42.json delete mode 100644 .sqlx/query-e718f4064cbb3d1b27049450121451dbcf01e8bec8a987b58c699b27b5d737af.json create mode 100644 migrations/20241030152013_channel_message_last_event_id.sql (limited to 'src/channel/repo.rs') diff --git a/.sqlx/query-093a57206253b4c05c2aa51fe70264af3f5b77c6dacd4a61be470f262d62b5d4.json b/.sqlx/query-093a57206253b4c05c2aa51fe70264af3f5b77c6dacd4a61be470f262d62b5d4.json deleted file mode 100644 index 1d8a2e1..0000000 --- a/.sqlx/query-093a57206253b4c05c2aa51fe70264af3f5b77c6dacd4a61be470f262d62b5d4.json +++ /dev/null @@ -1,56 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n select\n id as \"id: Id\",\n name.display_name as \"display_name?: String\",\n name.canonical_name as \"canonical_name?: String\",\n channel.created_at as \"created_at: DateTime\",\n channel.created_sequence as \"created_sequence: Sequence\",\n deleted.deleted_at as \"deleted_at?: DateTime\",\n deleted.deleted_sequence as \"deleted_sequence?: Sequence\"\n from channel\n left join channel_name as name\n using (id)\n left join channel_deleted as deleted\n using (id)\n where channel.created_sequence > $1\n or deleted.deleted_sequence > $1\n ", - "describe": { - "columns": [ - { - "name": "id: Id", - "ordinal": 0, - "type_info": "Text" - }, - { - "name": "display_name?: String", - "ordinal": 1, - "type_info": "Null" - }, - { - "name": "canonical_name?: String", - "ordinal": 2, - "type_info": "Null" - }, - { - "name": "created_at: DateTime", - "ordinal": 3, - "type_info": "Text" - }, - { - "name": "created_sequence: Sequence", - "ordinal": 4, - "type_info": "Integer" - }, - { - "name": "deleted_at?: DateTime", - "ordinal": 5, - "type_info": "Text" - }, - { - "name": "deleted_sequence?: Sequence", - "ordinal": 6, - "type_info": "Integer" - } - ], - "parameters": { - "Right": 1 - }, - "nullable": [ - false, - true, - true, - false, - false, - true, - true - ] - }, - "hash": "093a57206253b4c05c2aa51fe70264af3f5b77c6dacd4a61be470f262d62b5d4" -} diff --git a/.sqlx/query-0f0e4a6ac32b39f3bd7f4832389259b91bbffa182e32b224635031eead2fa82d.json b/.sqlx/query-0f0e4a6ac32b39f3bd7f4832389259b91bbffa182e32b224635031eead2fa82d.json deleted file mode 100644 index fd5a165..0000000 --- a/.sqlx/query-0f0e4a6ac32b39f3bd7f4832389259b91bbffa182e32b224635031eead2fa82d.json +++ /dev/null @@ -1,50 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n insert into message\n (id, channel, sender, sent_at, sent_sequence, body)\n values ($1, $2, $3, $4, $5, $6)\n returning\n id as \"id: Id\",\n channel as \"channel: channel::Id\",\n sender as \"sender: login::Id\",\n sent_at as \"sent_at: DateTime\",\n sent_sequence as \"sent_sequence: Sequence\",\n body as \"body: Body\"\n ", - "describe": { - "columns": [ - { - "name": "id: Id", - "ordinal": 0, - "type_info": "Text" - }, - { - "name": "channel: channel::Id", - "ordinal": 1, - "type_info": "Text" - }, - { - "name": "sender: login::Id", - "ordinal": 2, - "type_info": "Text" - }, - { - "name": "sent_at: DateTime", - "ordinal": 3, - "type_info": "Text" - }, - { - "name": "sent_sequence: Sequence", - "ordinal": 4, - "type_info": "Integer" - }, - { - "name": "body: Body", - "ordinal": 5, - "type_info": "Text" - } - ], - "parameters": { - "Right": 6 - }, - "nullable": [ - false, - false, - false, - false, - false, - true - ] - }, - "hash": "0f0e4a6ac32b39f3bd7f4832389259b91bbffa182e32b224635031eead2fa82d" -} diff --git a/.sqlx/query-4e39f27605dec811824fddae5559dda60c4b2a9c6746376a3552ce73b7d8ea38.json b/.sqlx/query-4e39f27605dec811824fddae5559dda60c4b2a9c6746376a3552ce73b7d8ea38.json new file mode 100644 index 0000000..902b216 --- /dev/null +++ b/.sqlx/query-4e39f27605dec811824fddae5559dda60c4b2a9c6746376a3552ce73b7d8ea38.json @@ -0,0 +1,20 @@ +{ + "db_name": "SQLite", + "query": "\n update channel\n set last_sequence = max(last_sequence, $1)\n where id = $2\n returning id as \"id: Id\"\n ", + "describe": { + "columns": [ + { + "name": "id: Id", + "ordinal": 0, + "type_info": "Text" + } + ], + "parameters": { + "Right": 2 + }, + "nullable": [ + false + ] + }, + "hash": "4e39f27605dec811824fddae5559dda60c4b2a9c6746376a3552ce73b7d8ea38" +} diff --git a/.sqlx/query-53b1f14d450a99f486bfd79075e71bd7e30dc93d32e1f273c878f18f2984860f.json b/.sqlx/query-53b1f14d450a99f486bfd79075e71bd7e30dc93d32e1f273c878f18f2984860f.json new file mode 100644 index 0000000..7ec6aac --- /dev/null +++ b/.sqlx/query-53b1f14d450a99f486bfd79075e71bd7e30dc93d32e1f273c878f18f2984860f.json @@ -0,0 +1,62 @@ +{ + "db_name": "SQLite", + "query": "\n select\n id as \"id: Id\",\n message.channel as \"channel: channel::Id\",\n message.sender as \"sender: login::Id\",\n message.sent_at as \"sent_at: DateTime\",\n message.sent_sequence as \"sent_sequence: Sequence\",\n message.body as \"body: Body\",\n deleted.deleted_at as \"deleted_at?: DateTime\",\n deleted.deleted_sequence as \"deleted_sequence?: Sequence\"\n from message\n left join message_deleted as deleted\n using (id)\n where message.last_sequence > $1\n ", + "describe": { + "columns": [ + { + "name": "id: Id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "channel: channel::Id", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "sender: login::Id", + "ordinal": 2, + "type_info": "Text" + }, + { + "name": "sent_at: DateTime", + "ordinal": 3, + "type_info": "Text" + }, + { + "name": "sent_sequence: Sequence", + "ordinal": 4, + "type_info": "Integer" + }, + { + "name": "body: Body", + "ordinal": 5, + "type_info": "Text" + }, + { + "name": "deleted_at?: DateTime", + "ordinal": 6, + "type_info": "Text" + }, + { + "name": "deleted_sequence?: Sequence", + "ordinal": 7, + "type_info": "Integer" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false, + false, + false, + false, + false, + true, + false, + false + ] + }, + "hash": "53b1f14d450a99f486bfd79075e71bd7e30dc93d32e1f273c878f18f2984860f" +} diff --git a/.sqlx/query-64fb9bad4505c144578e393a7c0c7e8cf92e5ee6e3900fe9f94c75b5f8c9bfc4.json b/.sqlx/query-64fb9bad4505c144578e393a7c0c7e8cf92e5ee6e3900fe9f94c75b5f8c9bfc4.json new file mode 100644 index 0000000..5179e74 --- /dev/null +++ b/.sqlx/query-64fb9bad4505c144578e393a7c0c7e8cf92e5ee6e3900fe9f94c75b5f8c9bfc4.json @@ -0,0 +1,20 @@ +{ + "db_name": "SQLite", + "query": "\n update message\n set body = '', last_sequence = max(last_sequence, $1)\n where id = $2\n returning id as \"id: Id\"\n ", + "describe": { + "columns": [ + { + "name": "id: Id", + "ordinal": 0, + "type_info": "Text" + } + ], + "parameters": { + "Right": 2 + }, + "nullable": [ + false + ] + }, + "hash": "64fb9bad4505c144578e393a7c0c7e8cf92e5ee6e3900fe9f94c75b5f8c9bfc4" +} diff --git a/.sqlx/query-72441293731853e9f0cc1141e4322f8026f9e2515b6bacaed81f6248c52a198a.json b/.sqlx/query-72441293731853e9f0cc1141e4322f8026f9e2515b6bacaed81f6248c52a198a.json new file mode 100644 index 0000000..eb30352 --- /dev/null +++ b/.sqlx/query-72441293731853e9f0cc1141e4322f8026f9e2515b6bacaed81f6248c52a198a.json @@ -0,0 +1,50 @@ +{ + "db_name": "SQLite", + "query": "\n insert into message\n (id, channel, sender, sent_at, sent_sequence, body, last_sequence)\n values ($1, $2, $3, $4, $5, $6, $7)\n returning\n id as \"id: Id\",\n channel as \"channel: channel::Id\",\n sender as \"sender: login::Id\",\n sent_at as \"sent_at: DateTime\",\n sent_sequence as \"sent_sequence: Sequence\",\n body as \"body: Body\"\n ", + "describe": { + "columns": [ + { + "name": "id: Id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "channel: channel::Id", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "sender: login::Id", + "ordinal": 2, + "type_info": "Text" + }, + { + "name": "sent_at: DateTime", + "ordinal": 3, + "type_info": "Text" + }, + { + "name": "sent_sequence: Sequence", + "ordinal": 4, + "type_info": "Integer" + }, + { + "name": "body: Body", + "ordinal": 5, + "type_info": "Text" + } + ], + "parameters": { + "Right": 7 + }, + "nullable": [ + false, + false, + false, + false, + false, + true + ] + }, + "hash": "72441293731853e9f0cc1141e4322f8026f9e2515b6bacaed81f6248c52a198a" +} diff --git a/.sqlx/query-9a748f34805bb316452cb527d5ba226bf52349548261393e00e74081cdbe041b.json b/.sqlx/query-9a748f34805bb316452cb527d5ba226bf52349548261393e00e74081cdbe041b.json deleted file mode 100644 index 5423bfd..0000000 --- a/.sqlx/query-9a748f34805bb316452cb527d5ba226bf52349548261393e00e74081cdbe041b.json +++ /dev/null @@ -1,62 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n select\n id as \"id: Id\",\n message.channel as \"channel: channel::Id\",\n message.sender as \"sender: login::Id\",\n message.sent_at as \"sent_at: DateTime\",\n message.sent_sequence as \"sent_sequence: Sequence\",\n message.body as \"body: Body\",\n deleted.deleted_at as \"deleted_at?: DateTime\",\n deleted.deleted_sequence as \"deleted_sequence?: Sequence\"\n from message\n left join message_deleted as deleted\n using (id)\n where message.sent_sequence > $1\n or deleted.deleted_sequence > $1\n ", - "describe": { - "columns": [ - { - "name": "id: Id", - "ordinal": 0, - "type_info": "Text" - }, - { - "name": "channel: channel::Id", - "ordinal": 1, - "type_info": "Text" - }, - { - "name": "sender: login::Id", - "ordinal": 2, - "type_info": "Text" - }, - { - "name": "sent_at: DateTime", - "ordinal": 3, - "type_info": "Text" - }, - { - "name": "sent_sequence: Sequence", - "ordinal": 4, - "type_info": "Integer" - }, - { - "name": "body: Body", - "ordinal": 5, - "type_info": "Text" - }, - { - "name": "deleted_at?: DateTime", - "ordinal": 6, - "type_info": "Text" - }, - { - "name": "deleted_sequence?: Sequence", - "ordinal": 7, - "type_info": "Integer" - } - ], - "parameters": { - "Right": 1 - }, - "nullable": [ - false, - false, - false, - false, - false, - true, - true, - true - ] - }, - "hash": "9a748f34805bb316452cb527d5ba226bf52349548261393e00e74081cdbe041b" -} diff --git a/.sqlx/query-c44dbcc7f4c0257a991e1ae4a2679aaa4c3f28aa5436a9af067a754e46af5589.json b/.sqlx/query-c44dbcc7f4c0257a991e1ae4a2679aaa4c3f28aa5436a9af067a754e46af5589.json new file mode 100644 index 0000000..37d685a --- /dev/null +++ b/.sqlx/query-c44dbcc7f4c0257a991e1ae4a2679aaa4c3f28aa5436a9af067a754e46af5589.json @@ -0,0 +1,56 @@ +{ + "db_name": "SQLite", + "query": "\n select\n id as \"id: Id\",\n name.display_name as \"display_name?: String\",\n name.canonical_name as \"canonical_name?: String\",\n channel.created_at as \"created_at: DateTime\",\n channel.created_sequence as \"created_sequence: Sequence\",\n deleted.deleted_at as \"deleted_at?: DateTime\",\n deleted.deleted_sequence as \"deleted_sequence?: Sequence\"\n from channel\n left join channel_name as name\n using (id)\n left join channel_deleted as deleted\n using (id)\n where channel.last_sequence > $1\n ", + "describe": { + "columns": [ + { + "name": "id: Id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "display_name?: String", + "ordinal": 1, + "type_info": "Null" + }, + { + "name": "canonical_name?: String", + "ordinal": 2, + "type_info": "Null" + }, + { + "name": "created_at: DateTime", + "ordinal": 3, + "type_info": "Text" + }, + { + "name": "created_sequence: Sequence", + "ordinal": 4, + "type_info": "Integer" + }, + { + "name": "deleted_at?: DateTime", + "ordinal": 5, + "type_info": "Text" + }, + { + "name": "deleted_sequence?: Sequence", + "ordinal": 6, + "type_info": "Integer" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false, + false, + false, + false, + false, + false, + false + ] + }, + "hash": "c44dbcc7f4c0257a991e1ae4a2679aaa4c3f28aa5436a9af067a754e46af5589" +} diff --git a/.sqlx/query-ca9146e92c3b3e724f4b58ad72529de7030a4863d3bf479bb19a6a2a76d1590b.json b/.sqlx/query-ca9146e92c3b3e724f4b58ad72529de7030a4863d3bf479bb19a6a2a76d1590b.json new file mode 100644 index 0000000..0118249 --- /dev/null +++ b/.sqlx/query-ca9146e92c3b3e724f4b58ad72529de7030a4863d3bf479bb19a6a2a76d1590b.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "\n insert\n into channel (id, created_at, created_sequence, last_sequence)\n values ($1, $2, $3, $4)\n ", + "describe": { + "columns": [], + "parameters": { + "Right": 4 + }, + "nullable": [] + }, + "hash": "ca9146e92c3b3e724f4b58ad72529de7030a4863d3bf479bb19a6a2a76d1590b" +} diff --git a/.sqlx/query-d1c869c323d1ab45216279ae5ca13b72436960622863420d3f1d73a422fe5b42.json b/.sqlx/query-d1c869c323d1ab45216279ae5ca13b72436960622863420d3f1d73a422fe5b42.json deleted file mode 100644 index 658728c..0000000 --- a/.sqlx/query-d1c869c323d1ab45216279ae5ca13b72436960622863420d3f1d73a422fe5b42.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n insert\n into channel (id, created_at, created_sequence)\n values ($1, $2, $3)\n ", - "describe": { - "columns": [], - "parameters": { - "Right": 3 - }, - "nullable": [] - }, - "hash": "d1c869c323d1ab45216279ae5ca13b72436960622863420d3f1d73a422fe5b42" -} diff --git a/.sqlx/query-e718f4064cbb3d1b27049450121451dbcf01e8bec8a987b58c699b27b5d737af.json b/.sqlx/query-e718f4064cbb3d1b27049450121451dbcf01e8bec8a987b58c699b27b5d737af.json deleted file mode 100644 index 0c21ec1..0000000 --- a/.sqlx/query-e718f4064cbb3d1b27049450121451dbcf01e8bec8a987b58c699b27b5d737af.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n update message\n set body = ''\n where id = $1\n ", - "describe": { - "columns": [], - "parameters": { - "Right": 1 - }, - "nullable": [] - }, - "hash": "e718f4064cbb3d1b27049450121451dbcf01e8bec8a987b58c699b27b5d737af" -} diff --git a/migrations/20241030152013_channel_message_last_event_id.sql b/migrations/20241030152013_channel_message_last_event_id.sql new file mode 100644 index 0000000..dd6e66b --- /dev/null +++ b/migrations/20241030152013_channel_message_last_event_id.sql @@ -0,0 +1,141 @@ +alter table channel +rename to old_channel; +alter table channel_name +rename to old_channel_name; +alter table channel_deleted +rename to old_channel_deleted; +alter table message +rename to old_message; +alter table message_deleted +rename to old_message_deleted; + +create table channel ( + id text + not null + primary key, + created_sequence bigint + unique + not null, + created_at text + not null, + last_sequence bigint + not null +); + +insert into channel (id, created_sequence, created_at, last_sequence) +select + ch.id, + ch.created_sequence, + ch.created_at, + max(ch.created_sequence, coalesce(del.deleted_sequence, ch.created_sequence)) as last_seqeuence +from old_channel as ch +left join old_channel_deleted as del + using (id); + +create table channel_name ( + id text + not null + primary key + references channel (id), + display_name + not null, + canonical_name + not null + unique +); + +insert into channel_name (id, display_name, canonical_name) +select id, display_name, canonical_name +from old_channel_name; + +create table channel_deleted ( + id text + not null + primary key + references channel (id), + deleted_sequence bigint + unique + not null, + deleted_at text + not null +); + +insert into channel_deleted (id, deleted_sequence, deleted_at) +select id, deleted_sequence, deleted_at +from old_channel_deleted; + +create table message ( + id text + not null + primary key, + channel text + not null + references channel (id), + sender text + not null + references login (id), + sent_sequence bigint + unique + not null, + sent_at text + not null, + body text + null, + last_sequence bigint + not null +); + +insert into message (id, channel, sender, sent_sequence, sent_at, body, last_sequence) +select + msg.id, + msg.channel, + msg.sender, + msg.sent_sequence, + msg.sent_at, + msg.body, + max(msg.sent_sequence, coalesce(del.deleted_sequence, msg.sent_sequence)) as last_sequence +from + old_message as msg + left join old_message_deleted as del + using (id); + +create table message_deleted ( + id text + not null + primary key + references message (id), + deleted_sequence bigint + unique + not null, + deleted_at text + not null +); + +insert into message_deleted (id, deleted_sequence, deleted_at) +select id, deleted_sequence, deleted_at +from old_message_deleted; + +drop table old_message_deleted; +drop table old_message; +drop table old_channel_deleted; +drop table old_channel_name; +drop table old_channel; + +-- recreate existing indices +create index message_sent_at +on message (sent_at); +create index message_deleted_deleted_at +on message_deleted (deleted_at); +create index message_channel +on message(channel); +create index channel_created_sequence +on channel (created_sequence); +create index channel_created_at +on channel (created_at); + +-- new indices +create index channel_last_sequence +on channel (last_sequence); + +create index message_last_sequence +on message (last_sequence); diff --git a/src/channel/repo.rs b/src/channel/repo.rs index 7206c21..6612151 100644 --- a/src/channel/repo.rs +++ b/src/channel/repo.rs @@ -32,12 +32,13 @@ impl<'c> Channels<'c> { sqlx::query!( r#" insert - into channel (id, created_at, created_sequence) - values ($1, $2, $3) + into channel (id, created_at, created_sequence, last_sequence) + values ($1, $2, $3, $4) "#, id, created.at, created.sequence, + created.sequence, ) .execute(&mut *self.0) .await?; @@ -160,8 +161,7 @@ impl<'c> Channels<'c> { using (id) left join channel_deleted as deleted using (id) - where channel.created_sequence > $1 - or deleted.deleted_sequence > $1 + where channel.last_sequence > $1 "#, resume_at, ) @@ -190,6 +190,19 @@ impl<'c> Channels<'c> { deleted: &Instant, ) -> Result { let id = channel.id(); + sqlx::query!( + r#" + update channel + set last_sequence = max(last_sequence, $1) + where id = $2 + returning id as "id: Id" + "#, + deleted.sequence, + id, + ) + .fetch_one(&mut *self.0) + .await?; + sqlx::query!( r#" insert into channel_deleted (id, deleted_at, deleted_sequence) diff --git a/src/message/repo.rs b/src/message/repo.rs index 913135c..14f8eaf 100644 --- a/src/message/repo.rs +++ b/src/message/repo.rs @@ -34,8 +34,8 @@ impl<'c> Messages<'c> { let message = sqlx::query!( r#" insert into message - (id, channel, sender, sent_at, sent_sequence, body) - values ($1, $2, $3, $4, $5, $6) + (id, channel, sender, sent_at, sent_sequence, body, last_sequence) + values ($1, $2, $3, $4, $5, $6, $7) returning id as "id: Id", channel as "channel: channel::Id", @@ -50,6 +50,7 @@ impl<'c> Messages<'c> { sent.at, sent.sequence, body, + sent.sequence, ) .map(|row| History { message: Message { @@ -205,12 +206,14 @@ impl<'c> Messages<'c> { sqlx::query!( r#" update message - set body = '' - where id = $1 + set body = '', last_sequence = max(last_sequence, $1) + where id = $2 + returning id as "id: Id" "#, + deleted.sequence, id, ) - .execute(&mut *self.0) + .fetch_one(&mut *self.0) .await?; let message = self.by_id(id).await?; @@ -297,8 +300,7 @@ impl<'c> Messages<'c> { from message left join message_deleted as deleted using (id) - where message.sent_sequence > $1 - or deleted.deleted_sequence > $1 + where message.last_sequence > $1 "#, resume_at, ) -- cgit v1.2.3