From 06c839436900ce07ec5c53175b01f3c5011e507c Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Wed, 30 Oct 2024 11:32:52 -0400 Subject: Track an index-friendly sequence range for both channels and messages. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This is meant to limit the amount of messages that event replay needs to examine. Previously, the query required a table scan; table scans on `message` can be quite large, and should really be avoided. The new schema allows replays to be carried out using an index scan. The premise of this change is that, for each (channel, message), there is a range of event sequence numbers that the (channel, message) may appear in. We'll notate that as `[start, end]` in the general case, but they are: * for active channels, `[ch.created_sequence, ch.created_sequence]`. * for deleted channels, `[ch.created_sequence, ch_del.deleted_sequence]`. * for active messages, `[mg.sent_sequence, mg.sent_sequence]`. * for deleted messages, `[mg.sent_seqeunce, mg_del.deleted_sequence]`. (The two "active" ranges may grow in future releases, to support things like channel renames and message editing. That won't change the logic, but those two things will need to update the new `last_sequence` field.) There are two families of operations that need to retrieve based on these ranges: * Boot creates a snapshot as of a specific `resume_at` sequence number, and thus must include any record whose `start` falls on or before `resume_at`. We can't exclude records whose `end` is also before it, as their terminal state may be one that is included in boot (eg. active channels). * Event replay needs to include any events that fall after the same `resume_at`, and thus must include events from any record whose `end` falls strictly after `resume_at`. We can't exclude records whose `start` is also strictly after `resume_at`, as we'd omit them from replay, inappropriately, if we did. This gives three interesting cases: 1. Record fully after `resume_at`: event sequence --increasing--> x-a … x … x+k … resume_at start end This record should be omitted by boot, but included for event replay. 2. Record fully before `resume_at`: event sequence --increasing--> x … x+k … x+a start end resume_at This record should be omitted for event replay, but included for boot. 3. Record overlapping `resume_at`: event sequence --increasing--> x … x+a … x+k start resume_at end This record needs to be included for both boot and event replay. However, the bounds of that range were previously stored in two tables (`channel` and `channel_deleted`, or `message` and `message_deleted`, respectively), which sqlite (indeed most SQL implementations) cannot index. This forced a table scan, leading to the program considering every possible (channel, message) during event replay. This commit adds a `last_sequence` field to channels and messages, which is set to the above values as channels and messages are operated on. This field is indexed, and queries can use it to rapidly identify relevant rows for event replay, cutting down the amount of reading needed to generate events on resume. --- ...1fe70264af3f5b77c6dacd4a61be470f262d62b5d4.json | 56 -------- ...32389259b91bbffa182e32b224635031eead2fa82d.json | 50 -------- ...ae5559dda60c4b2a9c6746376a3552ce73b7d8ea38.json | 20 +++ ...9075e71bd7e30dc93d32e1f273c878f18f2984860f.json | 62 +++++++++ ...3a7c0c7e8cf92e5ee6e3900fe9f94c75b5f8c9bfc4.json | 20 +++ ...41e4322f8026f9e2515b6bacaed81f6248c52a198a.json | 50 ++++++++ ...27d5ba226bf52349548261393e00e74081cdbe041b.json | 62 --------- ...e4a2679aaa4c3f28aa5436a9af067a754e46af5589.json | 56 ++++++++ ...ad72529de7030a4863d3bf479bb19a6a2a76d1590b.json | 12 ++ ...ae5ca13b72436960622863420d3f1d73a422fe5b42.json | 12 -- ...50121451dbcf01e8bec8a987b58c699b27b5d737af.json | 12 -- ...0241030152013_channel_message_last_event_id.sql | 141 +++++++++++++++++++++ src/channel/repo.rs | 21 ++- src/message/repo.rs | 16 ++- 14 files changed, 387 insertions(+), 203 deletions(-) delete mode 100644 .sqlx/query-093a57206253b4c05c2aa51fe70264af3f5b77c6dacd4a61be470f262d62b5d4.json delete mode 100644 .sqlx/query-0f0e4a6ac32b39f3bd7f4832389259b91bbffa182e32b224635031eead2fa82d.json create mode 100644 .sqlx/query-4e39f27605dec811824fddae5559dda60c4b2a9c6746376a3552ce73b7d8ea38.json create mode 100644 .sqlx/query-53b1f14d450a99f486bfd79075e71bd7e30dc93d32e1f273c878f18f2984860f.json create mode 100644 .sqlx/query-64fb9bad4505c144578e393a7c0c7e8cf92e5ee6e3900fe9f94c75b5f8c9bfc4.json create mode 100644 .sqlx/query-72441293731853e9f0cc1141e4322f8026f9e2515b6bacaed81f6248c52a198a.json delete mode 100644 .sqlx/query-9a748f34805bb316452cb527d5ba226bf52349548261393e00e74081cdbe041b.json create mode 100644 .sqlx/query-c44dbcc7f4c0257a991e1ae4a2679aaa4c3f28aa5436a9af067a754e46af5589.json create mode 100644 .sqlx/query-ca9146e92c3b3e724f4b58ad72529de7030a4863d3bf479bb19a6a2a76d1590b.json delete mode 100644 .sqlx/query-d1c869c323d1ab45216279ae5ca13b72436960622863420d3f1d73a422fe5b42.json delete mode 100644 .sqlx/query-e718f4064cbb3d1b27049450121451dbcf01e8bec8a987b58c699b27b5d737af.json create mode 100644 migrations/20241030152013_channel_message_last_event_id.sql diff --git a/.sqlx/query-093a57206253b4c05c2aa51fe70264af3f5b77c6dacd4a61be470f262d62b5d4.json b/.sqlx/query-093a57206253b4c05c2aa51fe70264af3f5b77c6dacd4a61be470f262d62b5d4.json deleted file mode 100644 index 1d8a2e1..0000000 --- a/.sqlx/query-093a57206253b4c05c2aa51fe70264af3f5b77c6dacd4a61be470f262d62b5d4.json +++ /dev/null @@ -1,56 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n select\n id as \"id: Id\",\n name.display_name as \"display_name?: String\",\n name.canonical_name as \"canonical_name?: String\",\n channel.created_at as \"created_at: DateTime\",\n channel.created_sequence as \"created_sequence: Sequence\",\n deleted.deleted_at as \"deleted_at?: DateTime\",\n deleted.deleted_sequence as \"deleted_sequence?: Sequence\"\n from channel\n left join channel_name as name\n using (id)\n left join channel_deleted as deleted\n using (id)\n where channel.created_sequence > $1\n or deleted.deleted_sequence > $1\n ", - "describe": { - "columns": [ - { - "name": "id: Id", - "ordinal": 0, - "type_info": "Text" - }, - { - "name": "display_name?: String", - "ordinal": 1, - "type_info": "Null" - }, - { - "name": "canonical_name?: String", - "ordinal": 2, - "type_info": "Null" - }, - { - "name": "created_at: DateTime", - "ordinal": 3, - "type_info": "Text" - }, - { - "name": "created_sequence: Sequence", - "ordinal": 4, - "type_info": "Integer" - }, - { - "name": "deleted_at?: DateTime", - "ordinal": 5, - "type_info": "Text" - }, - { - "name": "deleted_sequence?: Sequence", - "ordinal": 6, - "type_info": "Integer" - } - ], - "parameters": { - "Right": 1 - }, - "nullable": [ - false, - true, - true, - false, - false, - true, - true - ] - }, - "hash": "093a57206253b4c05c2aa51fe70264af3f5b77c6dacd4a61be470f262d62b5d4" -} diff --git a/.sqlx/query-0f0e4a6ac32b39f3bd7f4832389259b91bbffa182e32b224635031eead2fa82d.json b/.sqlx/query-0f0e4a6ac32b39f3bd7f4832389259b91bbffa182e32b224635031eead2fa82d.json deleted file mode 100644 index fd5a165..0000000 --- a/.sqlx/query-0f0e4a6ac32b39f3bd7f4832389259b91bbffa182e32b224635031eead2fa82d.json +++ /dev/null @@ -1,50 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n insert into message\n (id, channel, sender, sent_at, sent_sequence, body)\n values ($1, $2, $3, $4, $5, $6)\n returning\n id as \"id: Id\",\n channel as \"channel: channel::Id\",\n sender as \"sender: login::Id\",\n sent_at as \"sent_at: DateTime\",\n sent_sequence as \"sent_sequence: Sequence\",\n body as \"body: Body\"\n ", - "describe": { - "columns": [ - { - "name": "id: Id", - "ordinal": 0, - "type_info": "Text" - }, - { - "name": "channel: channel::Id", - "ordinal": 1, - "type_info": "Text" - }, - { - "name": "sender: login::Id", - "ordinal": 2, - "type_info": "Text" - }, - { - "name": "sent_at: DateTime", - "ordinal": 3, - "type_info": "Text" - }, - { - "name": "sent_sequence: Sequence", - "ordinal": 4, - "type_info": "Integer" - }, - { - "name": "body: Body", - "ordinal": 5, - "type_info": "Text" - } - ], - "parameters": { - "Right": 6 - }, - "nullable": [ - false, - false, - false, - false, - false, - true - ] - }, - "hash": "0f0e4a6ac32b39f3bd7f4832389259b91bbffa182e32b224635031eead2fa82d" -} diff --git a/.sqlx/query-4e39f27605dec811824fddae5559dda60c4b2a9c6746376a3552ce73b7d8ea38.json b/.sqlx/query-4e39f27605dec811824fddae5559dda60c4b2a9c6746376a3552ce73b7d8ea38.json new file mode 100644 index 0000000..902b216 --- /dev/null +++ b/.sqlx/query-4e39f27605dec811824fddae5559dda60c4b2a9c6746376a3552ce73b7d8ea38.json @@ -0,0 +1,20 @@ +{ + "db_name": "SQLite", + "query": "\n update channel\n set last_sequence = max(last_sequence, $1)\n where id = $2\n returning id as \"id: Id\"\n ", + "describe": { + "columns": [ + { + "name": "id: Id", + "ordinal": 0, + "type_info": "Text" + } + ], + "parameters": { + "Right": 2 + }, + "nullable": [ + false + ] + }, + "hash": "4e39f27605dec811824fddae5559dda60c4b2a9c6746376a3552ce73b7d8ea38" +} diff --git a/.sqlx/query-53b1f14d450a99f486bfd79075e71bd7e30dc93d32e1f273c878f18f2984860f.json b/.sqlx/query-53b1f14d450a99f486bfd79075e71bd7e30dc93d32e1f273c878f18f2984860f.json new file mode 100644 index 0000000..7ec6aac --- /dev/null +++ b/.sqlx/query-53b1f14d450a99f486bfd79075e71bd7e30dc93d32e1f273c878f18f2984860f.json @@ -0,0 +1,62 @@ +{ + "db_name": "SQLite", + "query": "\n select\n id as \"id: Id\",\n message.channel as \"channel: channel::Id\",\n message.sender as \"sender: login::Id\",\n message.sent_at as \"sent_at: DateTime\",\n message.sent_sequence as \"sent_sequence: Sequence\",\n message.body as \"body: Body\",\n deleted.deleted_at as \"deleted_at?: DateTime\",\n deleted.deleted_sequence as \"deleted_sequence?: Sequence\"\n from message\n left join message_deleted as deleted\n using (id)\n where message.last_sequence > $1\n ", + "describe": { + "columns": [ + { + "name": "id: Id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "channel: channel::Id", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "sender: login::Id", + "ordinal": 2, + "type_info": "Text" + }, + { + "name": "sent_at: DateTime", + "ordinal": 3, + "type_info": "Text" + }, + { + "name": "sent_sequence: Sequence", + "ordinal": 4, + "type_info": "Integer" + }, + { + "name": "body: Body", + "ordinal": 5, + "type_info": "Text" + }, + { + "name": "deleted_at?: DateTime", + "ordinal": 6, + "type_info": "Text" + }, + { + "name": "deleted_sequence?: Sequence", + "ordinal": 7, + "type_info": "Integer" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false, + false, + false, + false, + false, + true, + false, + false + ] + }, + "hash": "53b1f14d450a99f486bfd79075e71bd7e30dc93d32e1f273c878f18f2984860f" +} diff --git a/.sqlx/query-64fb9bad4505c144578e393a7c0c7e8cf92e5ee6e3900fe9f94c75b5f8c9bfc4.json b/.sqlx/query-64fb9bad4505c144578e393a7c0c7e8cf92e5ee6e3900fe9f94c75b5f8c9bfc4.json new file mode 100644 index 0000000..5179e74 --- /dev/null +++ b/.sqlx/query-64fb9bad4505c144578e393a7c0c7e8cf92e5ee6e3900fe9f94c75b5f8c9bfc4.json @@ -0,0 +1,20 @@ +{ + "db_name": "SQLite", + "query": "\n update message\n set body = '', last_sequence = max(last_sequence, $1)\n where id = $2\n returning id as \"id: Id\"\n ", + "describe": { + "columns": [ + { + "name": "id: Id", + "ordinal": 0, + "type_info": "Text" + } + ], + "parameters": { + "Right": 2 + }, + "nullable": [ + false + ] + }, + "hash": "64fb9bad4505c144578e393a7c0c7e8cf92e5ee6e3900fe9f94c75b5f8c9bfc4" +} diff --git a/.sqlx/query-72441293731853e9f0cc1141e4322f8026f9e2515b6bacaed81f6248c52a198a.json b/.sqlx/query-72441293731853e9f0cc1141e4322f8026f9e2515b6bacaed81f6248c52a198a.json new file mode 100644 index 0000000..eb30352 --- /dev/null +++ b/.sqlx/query-72441293731853e9f0cc1141e4322f8026f9e2515b6bacaed81f6248c52a198a.json @@ -0,0 +1,50 @@ +{ + "db_name": "SQLite", + "query": "\n insert into message\n (id, channel, sender, sent_at, sent_sequence, body, last_sequence)\n values ($1, $2, $3, $4, $5, $6, $7)\n returning\n id as \"id: Id\",\n channel as \"channel: channel::Id\",\n sender as \"sender: login::Id\",\n sent_at as \"sent_at: DateTime\",\n sent_sequence as \"sent_sequence: Sequence\",\n body as \"body: Body\"\n ", + "describe": { + "columns": [ + { + "name": "id: Id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "channel: channel::Id", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "sender: login::Id", + "ordinal": 2, + "type_info": "Text" + }, + { + "name": "sent_at: DateTime", + "ordinal": 3, + "type_info": "Text" + }, + { + "name": "sent_sequence: Sequence", + "ordinal": 4, + "type_info": "Integer" + }, + { + "name": "body: Body", + "ordinal": 5, + "type_info": "Text" + } + ], + "parameters": { + "Right": 7 + }, + "nullable": [ + false, + false, + false, + false, + false, + true + ] + }, + "hash": "72441293731853e9f0cc1141e4322f8026f9e2515b6bacaed81f6248c52a198a" +} diff --git a/.sqlx/query-9a748f34805bb316452cb527d5ba226bf52349548261393e00e74081cdbe041b.json b/.sqlx/query-9a748f34805bb316452cb527d5ba226bf52349548261393e00e74081cdbe041b.json deleted file mode 100644 index 5423bfd..0000000 --- a/.sqlx/query-9a748f34805bb316452cb527d5ba226bf52349548261393e00e74081cdbe041b.json +++ /dev/null @@ -1,62 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n select\n id as \"id: Id\",\n message.channel as \"channel: channel::Id\",\n message.sender as \"sender: login::Id\",\n message.sent_at as \"sent_at: DateTime\",\n message.sent_sequence as \"sent_sequence: Sequence\",\n message.body as \"body: Body\",\n deleted.deleted_at as \"deleted_at?: DateTime\",\n deleted.deleted_sequence as \"deleted_sequence?: Sequence\"\n from message\n left join message_deleted as deleted\n using (id)\n where message.sent_sequence > $1\n or deleted.deleted_sequence > $1\n ", - "describe": { - "columns": [ - { - "name": "id: Id", - "ordinal": 0, - "type_info": "Text" - }, - { - "name": "channel: channel::Id", - "ordinal": 1, - "type_info": "Text" - }, - { - "name": "sender: login::Id", - "ordinal": 2, - "type_info": "Text" - }, - { - "name": "sent_at: DateTime", - "ordinal": 3, - "type_info": "Text" - }, - { - "name": "sent_sequence: Sequence", - "ordinal": 4, - "type_info": "Integer" - }, - { - "name": "body: Body", - "ordinal": 5, - "type_info": "Text" - }, - { - "name": "deleted_at?: DateTime", - "ordinal": 6, - "type_info": "Text" - }, - { - "name": "deleted_sequence?: Sequence", - "ordinal": 7, - "type_info": "Integer" - } - ], - "parameters": { - "Right": 1 - }, - "nullable": [ - false, - false, - false, - false, - false, - true, - true, - true - ] - }, - "hash": "9a748f34805bb316452cb527d5ba226bf52349548261393e00e74081cdbe041b" -} diff --git a/.sqlx/query-c44dbcc7f4c0257a991e1ae4a2679aaa4c3f28aa5436a9af067a754e46af5589.json b/.sqlx/query-c44dbcc7f4c0257a991e1ae4a2679aaa4c3f28aa5436a9af067a754e46af5589.json new file mode 100644 index 0000000..37d685a --- /dev/null +++ b/.sqlx/query-c44dbcc7f4c0257a991e1ae4a2679aaa4c3f28aa5436a9af067a754e46af5589.json @@ -0,0 +1,56 @@ +{ + "db_name": "SQLite", + "query": "\n select\n id as \"id: Id\",\n name.display_name as \"display_name?: String\",\n name.canonical_name as \"canonical_name?: String\",\n channel.created_at as \"created_at: DateTime\",\n channel.created_sequence as \"created_sequence: Sequence\",\n deleted.deleted_at as \"deleted_at?: DateTime\",\n deleted.deleted_sequence as \"deleted_sequence?: Sequence\"\n from channel\n left join channel_name as name\n using (id)\n left join channel_deleted as deleted\n using (id)\n where channel.last_sequence > $1\n ", + "describe": { + "columns": [ + { + "name": "id: Id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "display_name?: String", + "ordinal": 1, + "type_info": "Null" + }, + { + "name": "canonical_name?: String", + "ordinal": 2, + "type_info": "Null" + }, + { + "name": "created_at: DateTime", + "ordinal": 3, + "type_info": "Text" + }, + { + "name": "created_sequence: Sequence", + "ordinal": 4, + "type_info": "Integer" + }, + { + "name": "deleted_at?: DateTime", + "ordinal": 5, + "type_info": "Text" + }, + { + "name": "deleted_sequence?: Sequence", + "ordinal": 6, + "type_info": "Integer" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false, + false, + false, + false, + false, + false, + false + ] + }, + "hash": "c44dbcc7f4c0257a991e1ae4a2679aaa4c3f28aa5436a9af067a754e46af5589" +} diff --git a/.sqlx/query-ca9146e92c3b3e724f4b58ad72529de7030a4863d3bf479bb19a6a2a76d1590b.json b/.sqlx/query-ca9146e92c3b3e724f4b58ad72529de7030a4863d3bf479bb19a6a2a76d1590b.json new file mode 100644 index 0000000..0118249 --- /dev/null +++ b/.sqlx/query-ca9146e92c3b3e724f4b58ad72529de7030a4863d3bf479bb19a6a2a76d1590b.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "\n insert\n into channel (id, created_at, created_sequence, last_sequence)\n values ($1, $2, $3, $4)\n ", + "describe": { + "columns": [], + "parameters": { + "Right": 4 + }, + "nullable": [] + }, + "hash": "ca9146e92c3b3e724f4b58ad72529de7030a4863d3bf479bb19a6a2a76d1590b" +} diff --git a/.sqlx/query-d1c869c323d1ab45216279ae5ca13b72436960622863420d3f1d73a422fe5b42.json b/.sqlx/query-d1c869c323d1ab45216279ae5ca13b72436960622863420d3f1d73a422fe5b42.json deleted file mode 100644 index 658728c..0000000 --- a/.sqlx/query-d1c869c323d1ab45216279ae5ca13b72436960622863420d3f1d73a422fe5b42.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n insert\n into channel (id, created_at, created_sequence)\n values ($1, $2, $3)\n ", - "describe": { - "columns": [], - "parameters": { - "Right": 3 - }, - "nullable": [] - }, - "hash": "d1c869c323d1ab45216279ae5ca13b72436960622863420d3f1d73a422fe5b42" -} diff --git a/.sqlx/query-e718f4064cbb3d1b27049450121451dbcf01e8bec8a987b58c699b27b5d737af.json b/.sqlx/query-e718f4064cbb3d1b27049450121451dbcf01e8bec8a987b58c699b27b5d737af.json deleted file mode 100644 index 0c21ec1..0000000 --- a/.sqlx/query-e718f4064cbb3d1b27049450121451dbcf01e8bec8a987b58c699b27b5d737af.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n update message\n set body = ''\n where id = $1\n ", - "describe": { - "columns": [], - "parameters": { - "Right": 1 - }, - "nullable": [] - }, - "hash": "e718f4064cbb3d1b27049450121451dbcf01e8bec8a987b58c699b27b5d737af" -} diff --git a/migrations/20241030152013_channel_message_last_event_id.sql b/migrations/20241030152013_channel_message_last_event_id.sql new file mode 100644 index 0000000..dd6e66b --- /dev/null +++ b/migrations/20241030152013_channel_message_last_event_id.sql @@ -0,0 +1,141 @@ +alter table channel +rename to old_channel; +alter table channel_name +rename to old_channel_name; +alter table channel_deleted +rename to old_channel_deleted; +alter table message +rename to old_message; +alter table message_deleted +rename to old_message_deleted; + +create table channel ( + id text + not null + primary key, + created_sequence bigint + unique + not null, + created_at text + not null, + last_sequence bigint + not null +); + +insert into channel (id, created_sequence, created_at, last_sequence) +select + ch.id, + ch.created_sequence, + ch.created_at, + max(ch.created_sequence, coalesce(del.deleted_sequence, ch.created_sequence)) as last_seqeuence +from old_channel as ch +left join old_channel_deleted as del + using (id); + +create table channel_name ( + id text + not null + primary key + references channel (id), + display_name + not null, + canonical_name + not null + unique +); + +insert into channel_name (id, display_name, canonical_name) +select id, display_name, canonical_name +from old_channel_name; + +create table channel_deleted ( + id text + not null + primary key + references channel (id), + deleted_sequence bigint + unique + not null, + deleted_at text + not null +); + +insert into channel_deleted (id, deleted_sequence, deleted_at) +select id, deleted_sequence, deleted_at +from old_channel_deleted; + +create table message ( + id text + not null + primary key, + channel text + not null + references channel (id), + sender text + not null + references login (id), + sent_sequence bigint + unique + not null, + sent_at text + not null, + body text + null, + last_sequence bigint + not null +); + +insert into message (id, channel, sender, sent_sequence, sent_at, body, last_sequence) +select + msg.id, + msg.channel, + msg.sender, + msg.sent_sequence, + msg.sent_at, + msg.body, + max(msg.sent_sequence, coalesce(del.deleted_sequence, msg.sent_sequence)) as last_sequence +from + old_message as msg + left join old_message_deleted as del + using (id); + +create table message_deleted ( + id text + not null + primary key + references message (id), + deleted_sequence bigint + unique + not null, + deleted_at text + not null +); + +insert into message_deleted (id, deleted_sequence, deleted_at) +select id, deleted_sequence, deleted_at +from old_message_deleted; + +drop table old_message_deleted; +drop table old_message; +drop table old_channel_deleted; +drop table old_channel_name; +drop table old_channel; + +-- recreate existing indices +create index message_sent_at +on message (sent_at); +create index message_deleted_deleted_at +on message_deleted (deleted_at); +create index message_channel +on message(channel); +create index channel_created_sequence +on channel (created_sequence); +create index channel_created_at +on channel (created_at); + +-- new indices +create index channel_last_sequence +on channel (last_sequence); + +create index message_last_sequence +on message (last_sequence); diff --git a/src/channel/repo.rs b/src/channel/repo.rs index 7206c21..6612151 100644 --- a/src/channel/repo.rs +++ b/src/channel/repo.rs @@ -32,12 +32,13 @@ impl<'c> Channels<'c> { sqlx::query!( r#" insert - into channel (id, created_at, created_sequence) - values ($1, $2, $3) + into channel (id, created_at, created_sequence, last_sequence) + values ($1, $2, $3, $4) "#, id, created.at, created.sequence, + created.sequence, ) .execute(&mut *self.0) .await?; @@ -160,8 +161,7 @@ impl<'c> Channels<'c> { using (id) left join channel_deleted as deleted using (id) - where channel.created_sequence > $1 - or deleted.deleted_sequence > $1 + where channel.last_sequence > $1 "#, resume_at, ) @@ -190,6 +190,19 @@ impl<'c> Channels<'c> { deleted: &Instant, ) -> Result { let id = channel.id(); + sqlx::query!( + r#" + update channel + set last_sequence = max(last_sequence, $1) + where id = $2 + returning id as "id: Id" + "#, + deleted.sequence, + id, + ) + .fetch_one(&mut *self.0) + .await?; + sqlx::query!( r#" insert into channel_deleted (id, deleted_at, deleted_sequence) diff --git a/src/message/repo.rs b/src/message/repo.rs index 913135c..14f8eaf 100644 --- a/src/message/repo.rs +++ b/src/message/repo.rs @@ -34,8 +34,8 @@ impl<'c> Messages<'c> { let message = sqlx::query!( r#" insert into message - (id, channel, sender, sent_at, sent_sequence, body) - values ($1, $2, $3, $4, $5, $6) + (id, channel, sender, sent_at, sent_sequence, body, last_sequence) + values ($1, $2, $3, $4, $5, $6, $7) returning id as "id: Id", channel as "channel: channel::Id", @@ -50,6 +50,7 @@ impl<'c> Messages<'c> { sent.at, sent.sequence, body, + sent.sequence, ) .map(|row| History { message: Message { @@ -205,12 +206,14 @@ impl<'c> Messages<'c> { sqlx::query!( r#" update message - set body = '' - where id = $1 + set body = '', last_sequence = max(last_sequence, $1) + where id = $2 + returning id as "id: Id" "#, + deleted.sequence, id, ) - .execute(&mut *self.0) + .fetch_one(&mut *self.0) .await?; let message = self.by_id(id).await?; @@ -297,8 +300,7 @@ impl<'c> Messages<'c> { from message left join message_deleted as deleted using (id) - where message.sent_sequence > $1 - or deleted.deleted_sequence > $1 + where message.last_sequence > $1 "#, resume_at, ) -- cgit v1.2.3