summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorOwen Jacobson <owen@grimoire.ca>2024-10-30 11:32:52 -0400
committerOwen Jacobson <owen@grimoire.ca>2024-10-30 12:09:41 -0400
commit06c839436900ce07ec5c53175b01f3c5011e507c (patch)
treec9c1d2045982d65427b5345ae91609f4d224edb8
parent50a382528288248381b07c25719cbc9a519b4c81 (diff)
Track an index-friendly sequence range for both channels and messages.
This is meant to limit the amount of messages that event replay needs to examine. Previously, the query required a table scan; table scans on `message` can be quite large, and should really be avoided. The new schema allows replays to be carried out using an index scan. The premise of this change is that, for each (channel, message), there is a range of event sequence numbers that the (channel, message) may appear in. We'll notate that as `[start, end]` in the general case, but they are: * for active channels, `[ch.created_sequence, ch.created_sequence]`. * for deleted channels, `[ch.created_sequence, ch_del.deleted_sequence]`. * for active messages, `[mg.sent_sequence, mg.sent_sequence]`. * for deleted messages, `[mg.sent_seqeunce, mg_del.deleted_sequence]`. (The two "active" ranges may grow in future releases, to support things like channel renames and message editing. That won't change the logic, but those two things will need to update the new `last_sequence` field.) There are two families of operations that need to retrieve based on these ranges: * Boot creates a snapshot as of a specific `resume_at` sequence number, and thus must include any record whose `start` falls on or before `resume_at`. We can't exclude records whose `end` is also before it, as their terminal state may be one that is included in boot (eg. active channels). * Event replay needs to include any events that fall after the same `resume_at`, and thus must include events from any record whose `end` falls strictly after `resume_at`. We can't exclude records whose `start` is also strictly after `resume_at`, as we'd omit them from replay, inappropriately, if we did. This gives three interesting cases: 1. Record fully after `resume_at`: event sequence --increasing--> x-a … x … x+k … resume_at start end This record should be omitted by boot, but included for event replay. 2. Record fully before `resume_at`: event sequence --increasing--> x … x+k … x+a start end resume_at This record should be omitted for event replay, but included for boot. 3. Record overlapping `resume_at`: event sequence --increasing--> x … x+a … x+k start resume_at end This record needs to be included for both boot and event replay. However, the bounds of that range were previously stored in two tables (`channel` and `channel_deleted`, or `message` and `message_deleted`, respectively), which sqlite (indeed most SQL implementations) cannot index. This forced a table scan, leading to the program considering every possible (channel, message) during event replay. This commit adds a `last_sequence` field to channels and messages, which is set to the above values as channels and messages are operated on. This field is indexed, and queries can use it to rapidly identify relevant rows for event replay, cutting down the amount of reading needed to generate events on resume.
-rw-r--r--.sqlx/query-4e39f27605dec811824fddae5559dda60c4b2a9c6746376a3552ce73b7d8ea38.json20
-rw-r--r--.sqlx/query-53b1f14d450a99f486bfd79075e71bd7e30dc93d32e1f273c878f18f2984860f.json (renamed from .sqlx/query-9a748f34805bb316452cb527d5ba226bf52349548261393e00e74081cdbe041b.json)8
-rw-r--r--.sqlx/query-64fb9bad4505c144578e393a7c0c7e8cf92e5ee6e3900fe9f94c75b5f8c9bfc4.json20
-rw-r--r--.sqlx/query-72441293731853e9f0cc1141e4322f8026f9e2515b6bacaed81f6248c52a198a.json (renamed from .sqlx/query-0f0e4a6ac32b39f3bd7f4832389259b91bbffa182e32b224635031eead2fa82d.json)6
-rw-r--r--.sqlx/query-c44dbcc7f4c0257a991e1ae4a2679aaa4c3f28aa5436a9af067a754e46af5589.json (renamed from .sqlx/query-093a57206253b4c05c2aa51fe70264af3f5b77c6dacd4a61be470f262d62b5d4.json)12
-rw-r--r--.sqlx/query-ca9146e92c3b3e724f4b58ad72529de7030a4863d3bf479bb19a6a2a76d1590b.json12
-rw-r--r--.sqlx/query-d1c869c323d1ab45216279ae5ca13b72436960622863420d3f1d73a422fe5b42.json12
-rw-r--r--.sqlx/query-e718f4064cbb3d1b27049450121451dbcf01e8bec8a987b58c699b27b5d737af.json12
-rw-r--r--migrations/20241030152013_channel_message_last_event_id.sql141
-rw-r--r--src/channel/repo.rs21
-rw-r--r--src/message/repo.rs16
11 files changed, 232 insertions, 48 deletions
diff --git a/.sqlx/query-4e39f27605dec811824fddae5559dda60c4b2a9c6746376a3552ce73b7d8ea38.json b/.sqlx/query-4e39f27605dec811824fddae5559dda60c4b2a9c6746376a3552ce73b7d8ea38.json
new file mode 100644
index 0000000..902b216
--- /dev/null
+++ b/.sqlx/query-4e39f27605dec811824fddae5559dda60c4b2a9c6746376a3552ce73b7d8ea38.json
@@ -0,0 +1,20 @@
+{
+ "db_name": "SQLite",
+ "query": "\n update channel\n set last_sequence = max(last_sequence, $1)\n where id = $2\n returning id as \"id: Id\"\n ",
+ "describe": {
+ "columns": [
+ {
+ "name": "id: Id",
+ "ordinal": 0,
+ "type_info": "Text"
+ }
+ ],
+ "parameters": {
+ "Right": 2
+ },
+ "nullable": [
+ false
+ ]
+ },
+ "hash": "4e39f27605dec811824fddae5559dda60c4b2a9c6746376a3552ce73b7d8ea38"
+}
diff --git a/.sqlx/query-9a748f34805bb316452cb527d5ba226bf52349548261393e00e74081cdbe041b.json b/.sqlx/query-53b1f14d450a99f486bfd79075e71bd7e30dc93d32e1f273c878f18f2984860f.json
index 5423bfd..7ec6aac 100644
--- a/.sqlx/query-9a748f34805bb316452cb527d5ba226bf52349548261393e00e74081cdbe041b.json
+++ b/.sqlx/query-53b1f14d450a99f486bfd79075e71bd7e30dc93d32e1f273c878f18f2984860f.json
@@ -1,6 +1,6 @@
{
"db_name": "SQLite",
- "query": "\n select\n id as \"id: Id\",\n message.channel as \"channel: channel::Id\",\n message.sender as \"sender: login::Id\",\n message.sent_at as \"sent_at: DateTime\",\n message.sent_sequence as \"sent_sequence: Sequence\",\n message.body as \"body: Body\",\n deleted.deleted_at as \"deleted_at?: DateTime\",\n deleted.deleted_sequence as \"deleted_sequence?: Sequence\"\n from message\n left join message_deleted as deleted\n using (id)\n where message.sent_sequence > $1\n or deleted.deleted_sequence > $1\n ",
+ "query": "\n select\n id as \"id: Id\",\n message.channel as \"channel: channel::Id\",\n message.sender as \"sender: login::Id\",\n message.sent_at as \"sent_at: DateTime\",\n message.sent_sequence as \"sent_sequence: Sequence\",\n message.body as \"body: Body\",\n deleted.deleted_at as \"deleted_at?: DateTime\",\n deleted.deleted_sequence as \"deleted_sequence?: Sequence\"\n from message\n left join message_deleted as deleted\n using (id)\n where message.last_sequence > $1\n ",
"describe": {
"columns": [
{
@@ -54,9 +54,9 @@
false,
false,
true,
- true,
- true
+ false,
+ false
]
},
- "hash": "9a748f34805bb316452cb527d5ba226bf52349548261393e00e74081cdbe041b"
+ "hash": "53b1f14d450a99f486bfd79075e71bd7e30dc93d32e1f273c878f18f2984860f"
}
diff --git a/.sqlx/query-64fb9bad4505c144578e393a7c0c7e8cf92e5ee6e3900fe9f94c75b5f8c9bfc4.json b/.sqlx/query-64fb9bad4505c144578e393a7c0c7e8cf92e5ee6e3900fe9f94c75b5f8c9bfc4.json
new file mode 100644
index 0000000..5179e74
--- /dev/null
+++ b/.sqlx/query-64fb9bad4505c144578e393a7c0c7e8cf92e5ee6e3900fe9f94c75b5f8c9bfc4.json
@@ -0,0 +1,20 @@
+{
+ "db_name": "SQLite",
+ "query": "\n update message\n set body = '', last_sequence = max(last_sequence, $1)\n where id = $2\n returning id as \"id: Id\"\n ",
+ "describe": {
+ "columns": [
+ {
+ "name": "id: Id",
+ "ordinal": 0,
+ "type_info": "Text"
+ }
+ ],
+ "parameters": {
+ "Right": 2
+ },
+ "nullable": [
+ false
+ ]
+ },
+ "hash": "64fb9bad4505c144578e393a7c0c7e8cf92e5ee6e3900fe9f94c75b5f8c9bfc4"
+}
diff --git a/.sqlx/query-0f0e4a6ac32b39f3bd7f4832389259b91bbffa182e32b224635031eead2fa82d.json b/.sqlx/query-72441293731853e9f0cc1141e4322f8026f9e2515b6bacaed81f6248c52a198a.json
index fd5a165..eb30352 100644
--- a/.sqlx/query-0f0e4a6ac32b39f3bd7f4832389259b91bbffa182e32b224635031eead2fa82d.json
+++ b/.sqlx/query-72441293731853e9f0cc1141e4322f8026f9e2515b6bacaed81f6248c52a198a.json
@@ -1,6 +1,6 @@
{
"db_name": "SQLite",
- "query": "\n insert into message\n (id, channel, sender, sent_at, sent_sequence, body)\n values ($1, $2, $3, $4, $5, $6)\n returning\n id as \"id: Id\",\n channel as \"channel: channel::Id\",\n sender as \"sender: login::Id\",\n sent_at as \"sent_at: DateTime\",\n sent_sequence as \"sent_sequence: Sequence\",\n body as \"body: Body\"\n ",
+ "query": "\n insert into message\n (id, channel, sender, sent_at, sent_sequence, body, last_sequence)\n values ($1, $2, $3, $4, $5, $6, $7)\n returning\n id as \"id: Id\",\n channel as \"channel: channel::Id\",\n sender as \"sender: login::Id\",\n sent_at as \"sent_at: DateTime\",\n sent_sequence as \"sent_sequence: Sequence\",\n body as \"body: Body\"\n ",
"describe": {
"columns": [
{
@@ -35,7 +35,7 @@
}
],
"parameters": {
- "Right": 6
+ "Right": 7
},
"nullable": [
false,
@@ -46,5 +46,5 @@
true
]
},
- "hash": "0f0e4a6ac32b39f3bd7f4832389259b91bbffa182e32b224635031eead2fa82d"
+ "hash": "72441293731853e9f0cc1141e4322f8026f9e2515b6bacaed81f6248c52a198a"
}
diff --git a/.sqlx/query-093a57206253b4c05c2aa51fe70264af3f5b77c6dacd4a61be470f262d62b5d4.json b/.sqlx/query-c44dbcc7f4c0257a991e1ae4a2679aaa4c3f28aa5436a9af067a754e46af5589.json
index 1d8a2e1..37d685a 100644
--- a/.sqlx/query-093a57206253b4c05c2aa51fe70264af3f5b77c6dacd4a61be470f262d62b5d4.json
+++ b/.sqlx/query-c44dbcc7f4c0257a991e1ae4a2679aaa4c3f28aa5436a9af067a754e46af5589.json
@@ -1,6 +1,6 @@
{
"db_name": "SQLite",
- "query": "\n select\n id as \"id: Id\",\n name.display_name as \"display_name?: String\",\n name.canonical_name as \"canonical_name?: String\",\n channel.created_at as \"created_at: DateTime\",\n channel.created_sequence as \"created_sequence: Sequence\",\n deleted.deleted_at as \"deleted_at?: DateTime\",\n deleted.deleted_sequence as \"deleted_sequence?: Sequence\"\n from channel\n left join channel_name as name\n using (id)\n left join channel_deleted as deleted\n using (id)\n where channel.created_sequence > $1\n or deleted.deleted_sequence > $1\n ",
+ "query": "\n select\n id as \"id: Id\",\n name.display_name as \"display_name?: String\",\n name.canonical_name as \"canonical_name?: String\",\n channel.created_at as \"created_at: DateTime\",\n channel.created_sequence as \"created_sequence: Sequence\",\n deleted.deleted_at as \"deleted_at?: DateTime\",\n deleted.deleted_sequence as \"deleted_sequence?: Sequence\"\n from channel\n left join channel_name as name\n using (id)\n left join channel_deleted as deleted\n using (id)\n where channel.last_sequence > $1\n ",
"describe": {
"columns": [
{
@@ -44,13 +44,13 @@
},
"nullable": [
false,
- true,
- true,
false,
false,
- true,
- true
+ false,
+ false,
+ false,
+ false
]
},
- "hash": "093a57206253b4c05c2aa51fe70264af3f5b77c6dacd4a61be470f262d62b5d4"
+ "hash": "c44dbcc7f4c0257a991e1ae4a2679aaa4c3f28aa5436a9af067a754e46af5589"
}
diff --git a/.sqlx/query-ca9146e92c3b3e724f4b58ad72529de7030a4863d3bf479bb19a6a2a76d1590b.json b/.sqlx/query-ca9146e92c3b3e724f4b58ad72529de7030a4863d3bf479bb19a6a2a76d1590b.json
new file mode 100644
index 0000000..0118249
--- /dev/null
+++ b/.sqlx/query-ca9146e92c3b3e724f4b58ad72529de7030a4863d3bf479bb19a6a2a76d1590b.json
@@ -0,0 +1,12 @@
+{
+ "db_name": "SQLite",
+ "query": "\n insert\n into channel (id, created_at, created_sequence, last_sequence)\n values ($1, $2, $3, $4)\n ",
+ "describe": {
+ "columns": [],
+ "parameters": {
+ "Right": 4
+ },
+ "nullable": []
+ },
+ "hash": "ca9146e92c3b3e724f4b58ad72529de7030a4863d3bf479bb19a6a2a76d1590b"
+}
diff --git a/.sqlx/query-d1c869c323d1ab45216279ae5ca13b72436960622863420d3f1d73a422fe5b42.json b/.sqlx/query-d1c869c323d1ab45216279ae5ca13b72436960622863420d3f1d73a422fe5b42.json
deleted file mode 100644
index 658728c..0000000
--- a/.sqlx/query-d1c869c323d1ab45216279ae5ca13b72436960622863420d3f1d73a422fe5b42.json
+++ /dev/null
@@ -1,12 +0,0 @@
-{
- "db_name": "SQLite",
- "query": "\n insert\n into channel (id, created_at, created_sequence)\n values ($1, $2, $3)\n ",
- "describe": {
- "columns": [],
- "parameters": {
- "Right": 3
- },
- "nullable": []
- },
- "hash": "d1c869c323d1ab45216279ae5ca13b72436960622863420d3f1d73a422fe5b42"
-}
diff --git a/.sqlx/query-e718f4064cbb3d1b27049450121451dbcf01e8bec8a987b58c699b27b5d737af.json b/.sqlx/query-e718f4064cbb3d1b27049450121451dbcf01e8bec8a987b58c699b27b5d737af.json
deleted file mode 100644
index 0c21ec1..0000000
--- a/.sqlx/query-e718f4064cbb3d1b27049450121451dbcf01e8bec8a987b58c699b27b5d737af.json
+++ /dev/null
@@ -1,12 +0,0 @@
-{
- "db_name": "SQLite",
- "query": "\n update message\n set body = ''\n where id = $1\n ",
- "describe": {
- "columns": [],
- "parameters": {
- "Right": 1
- },
- "nullable": []
- },
- "hash": "e718f4064cbb3d1b27049450121451dbcf01e8bec8a987b58c699b27b5d737af"
-}
diff --git a/migrations/20241030152013_channel_message_last_event_id.sql b/migrations/20241030152013_channel_message_last_event_id.sql
new file mode 100644
index 0000000..dd6e66b
--- /dev/null
+++ b/migrations/20241030152013_channel_message_last_event_id.sql
@@ -0,0 +1,141 @@
+alter table channel
+rename to old_channel;
+alter table channel_name
+rename to old_channel_name;
+alter table channel_deleted
+rename to old_channel_deleted;
+alter table message
+rename to old_message;
+alter table message_deleted
+rename to old_message_deleted;
+
+create table channel (
+ id text
+ not null
+ primary key,
+ created_sequence bigint
+ unique
+ not null,
+ created_at text
+ not null,
+ last_sequence bigint
+ not null
+);
+
+insert into channel (id, created_sequence, created_at, last_sequence)
+select
+ ch.id,
+ ch.created_sequence,
+ ch.created_at,
+ max(ch.created_sequence, coalesce(del.deleted_sequence, ch.created_sequence)) as last_seqeuence
+from old_channel as ch
+left join old_channel_deleted as del
+ using (id);
+
+create table channel_name (
+ id text
+ not null
+ primary key
+ references channel (id),
+ display_name
+ not null,
+ canonical_name
+ not null
+ unique
+);
+
+insert into channel_name (id, display_name, canonical_name)
+select id, display_name, canonical_name
+from old_channel_name;
+
+create table channel_deleted (
+ id text
+ not null
+ primary key
+ references channel (id),
+ deleted_sequence bigint
+ unique
+ not null,
+ deleted_at text
+ not null
+);
+
+insert into channel_deleted (id, deleted_sequence, deleted_at)
+select id, deleted_sequence, deleted_at
+from old_channel_deleted;
+
+create table message (
+ id text
+ not null
+ primary key,
+ channel text
+ not null
+ references channel (id),
+ sender text
+ not null
+ references login (id),
+ sent_sequence bigint
+ unique
+ not null,
+ sent_at text
+ not null,
+ body text
+ null,
+ last_sequence bigint
+ not null
+);
+
+insert into message (id, channel, sender, sent_sequence, sent_at, body, last_sequence)
+select
+ msg.id,
+ msg.channel,
+ msg.sender,
+ msg.sent_sequence,
+ msg.sent_at,
+ msg.body,
+ max(msg.sent_sequence, coalesce(del.deleted_sequence, msg.sent_sequence)) as last_sequence
+from
+ old_message as msg
+ left join old_message_deleted as del
+ using (id);
+
+create table message_deleted (
+ id text
+ not null
+ primary key
+ references message (id),
+ deleted_sequence bigint
+ unique
+ not null,
+ deleted_at text
+ not null
+);
+
+insert into message_deleted (id, deleted_sequence, deleted_at)
+select id, deleted_sequence, deleted_at
+from old_message_deleted;
+
+drop table old_message_deleted;
+drop table old_message;
+drop table old_channel_deleted;
+drop table old_channel_name;
+drop table old_channel;
+
+-- recreate existing indices
+create index message_sent_at
+on message (sent_at);
+create index message_deleted_deleted_at
+on message_deleted (deleted_at);
+create index message_channel
+on message(channel);
+create index channel_created_sequence
+on channel (created_sequence);
+create index channel_created_at
+on channel (created_at);
+
+-- new indices
+create index channel_last_sequence
+on channel (last_sequence);
+
+create index message_last_sequence
+on message (last_sequence);
diff --git a/src/channel/repo.rs b/src/channel/repo.rs
index 7206c21..6612151 100644
--- a/src/channel/repo.rs
+++ b/src/channel/repo.rs
@@ -32,12 +32,13 @@ impl<'c> Channels<'c> {
sqlx::query!(
r#"
insert
- into channel (id, created_at, created_sequence)
- values ($1, $2, $3)
+ into channel (id, created_at, created_sequence, last_sequence)
+ values ($1, $2, $3, $4)
"#,
id,
created.at,
created.sequence,
+ created.sequence,
)
.execute(&mut *self.0)
.await?;
@@ -160,8 +161,7 @@ impl<'c> Channels<'c> {
using (id)
left join channel_deleted as deleted
using (id)
- where channel.created_sequence > $1
- or deleted.deleted_sequence > $1
+ where channel.last_sequence > $1
"#,
resume_at,
)
@@ -192,6 +192,19 @@ impl<'c> Channels<'c> {
let id = channel.id();
sqlx::query!(
r#"
+ update channel
+ set last_sequence = max(last_sequence, $1)
+ where id = $2
+ returning id as "id: Id"
+ "#,
+ deleted.sequence,
+ id,
+ )
+ .fetch_one(&mut *self.0)
+ .await?;
+
+ sqlx::query!(
+ r#"
insert into channel_deleted (id, deleted_at, deleted_sequence)
values ($1, $2, $3)
"#,
diff --git a/src/message/repo.rs b/src/message/repo.rs
index 913135c..14f8eaf 100644
--- a/src/message/repo.rs
+++ b/src/message/repo.rs
@@ -34,8 +34,8 @@ impl<'c> Messages<'c> {
let message = sqlx::query!(
r#"
insert into message
- (id, channel, sender, sent_at, sent_sequence, body)
- values ($1, $2, $3, $4, $5, $6)
+ (id, channel, sender, sent_at, sent_sequence, body, last_sequence)
+ values ($1, $2, $3, $4, $5, $6, $7)
returning
id as "id: Id",
channel as "channel: channel::Id",
@@ -50,6 +50,7 @@ impl<'c> Messages<'c> {
sent.at,
sent.sequence,
body,
+ sent.sequence,
)
.map(|row| History {
message: Message {
@@ -205,12 +206,14 @@ impl<'c> Messages<'c> {
sqlx::query!(
r#"
update message
- set body = ''
- where id = $1
+ set body = '', last_sequence = max(last_sequence, $1)
+ where id = $2
+ returning id as "id: Id"
"#,
+ deleted.sequence,
id,
)
- .execute(&mut *self.0)
+ .fetch_one(&mut *self.0)
.await?;
let message = self.by_id(id).await?;
@@ -297,8 +300,7 @@ impl<'c> Messages<'c> {
from message
left join message_deleted as deleted
using (id)
- where message.sent_sequence > $1
- or deleted.deleted_sequence > $1
+ where message.last_sequence > $1
"#,
resume_at,
)