summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorOwen Jacobson <owen@grimoire.ca>2024-10-09 11:45:46 -0400
committerOwen Jacobson <owen@grimoire.ca>2024-10-09 11:45:46 -0400
commitfecc78192ff1ad83c6a2f41e35a65ac189d25c6f (patch)
tree481d82e99cf8aad8fe256d8186ae72bcee23bf9f
parentdd62b823e01934a0f841256fdb17b551091896bf (diff)
parent2f0b77e8fd02a137047c8975a573626cd76310ff (diff)
Merge branch 'wip/event-vocabulary'
-rw-r--r--.sqlx/query-191255b9e55c9b36d0fd047e56e515d121964d3481e48aae3558b53a5123ce7d.json50
-rw-r--r--.sqlx/query-3fbec32aeb32c49e088f246c00151035dcf174cec137326b63e0cb0e4ae5cb60.json38
-rw-r--r--.sqlx/query-4224d5c1c4009e0d31b96bc7b1d9f6a2215c7c135720c1222170a1f6692c3a8a.json38
-rw-r--r--.sqlx/query-45449846ea98e892c6e58f2ba8e082c21c9e3d124e5340dec036edd341d94e0f.json26
-rw-r--r--.sqlx/query-52aa7910b81c18c06673f57b7a8e81f7f317427b0c29b29d142bb50e4ef3c117.json26
-rw-r--r--.sqlx/query-5c53579fa431b6e184faf94eedb8229360ba78f2607d25e7e2ee5db5c759a5a3.json62
-rw-r--r--.sqlx/query-6fc4be85527af518da17c49cedc6bee1750d28a6176980ed7040b8a3301fc7e5.json62
-rw-r--r--.sqlx/query-836a37fa3fbcacbc880c7c9e603b3087a17906db95d14a35034889a253f23418.json50
-rw-r--r--.sqlx/query-9606853f2ea9f776f7e4384a2137be57b3a45fe38a675262ceaaebb3d346a9ca.json62
-rw-r--r--.sqlx/query-9f83f0365b174ce61adb6d48f3b75adcb3ddf8a88439aa8ddb0c66be2f0b384e.json50
-rw-r--r--.sqlx/query-ae9d1c997891f7e917cde554a251e76e93d3a43d1447faa4ec4085f7b3f60404.json (renamed from .sqlx/query-d3938e57aaf0ef5f9f9300eed9467f4a01053e8abbdd0610a3eafb623fd355c9.json)4
-rw-r--r--.sqlx/query-afe4404b44bd8ce6c9a51aafabe3524caa9882d0cba0dc4359eb62cf8154c92d.json50
-rw-r--r--.sqlx/query-b09438f4b1247a4e3991751de1fa77f2a18537d30e61ccbdcc947d0dba2b3da3.json38
-rw-r--r--.sqlx/query-b991b34b491306780a1b6efa157b6ee50f32e1136ad9cbd91caa0add2ab3cdaa.json (renamed from .sqlx/query-a0c6dc71c26f5d1e64cb18de6303288ea5b7cb44e26af87a26c69cc203667123.json)16
-rw-r--r--.sqlx/query-e476c3fb3f2227b7421ad2075d18b090751a345ac2e90bb8c116e0fbb5dfbb99.json50
-rw-r--r--docs/api.md77
-rw-r--r--hi-ui/src/apiServer.js53
-rw-r--r--hi-ui/src/lib/Message.svelte10
-rw-r--r--hi-ui/src/routes/+page.svelte15
-rw-r--r--hi-ui/src/store.js2
-rw-r--r--hi-ui/src/store/channels.js4
-rw-r--r--hi-ui/src/store/logins.js22
-rw-r--r--hi-ui/src/store/messages.js29
-rw-r--r--migrations/20241009031441_login_created_at.sql217
-rw-r--r--src/app.rs22
-rw-r--r--src/boot/app.rs46
-rw-r--r--src/boot/mod.rs64
-rw-r--r--src/channel/app.rs2
-rw-r--r--src/channel/event.rs32
-rw-r--r--src/channel/history.rs18
-rw-r--r--src/channel/routes/test/on_create.rs10
-rw-r--r--src/channel/routes/test/on_send.rs16
-rw-r--r--src/channel/snapshot.rs8
-rw-r--r--src/event/app.rs12
-rw-r--r--src/event/mod.rs67
-rw-r--r--src/event/routes/test.rs32
-rw-r--r--src/event/sequence.rs4
-rw-r--r--src/login/app.rs24
-rw-r--r--src/login/event.rs36
-rw-r--r--src/login/history.rs47
-rw-r--r--src/login/mod.rs20
-rw-r--r--src/login/repo.rs92
-rw-r--r--src/login/routes/test/login.rs6
-rw-r--r--src/login/routes/test/logout.rs2
-rw-r--r--src/login/snapshot.rs49
-rw-r--r--src/message/app.rs2
-rw-r--r--src/message/event.rs30
-rw-r--r--src/message/history.rs17
-rw-r--r--src/message/repo.rs125
-rw-r--r--src/message/snapshot.rs49
-rw-r--r--src/test/fixtures/event.rs8
-rw-r--r--src/test/fixtures/filter.rs6
-rw-r--r--src/test/fixtures/login.rs9
-rw-r--r--src/token/app.rs49
-rw-r--r--src/token/broadcaster.rs3
-rw-r--r--src/token/event.rs10
-rw-r--r--src/token/mod.rs4
-rw-r--r--src/token/repo/auth.rs28
-rw-r--r--src/token/repo/token.rs9
59 files changed, 1270 insertions, 739 deletions
diff --git a/.sqlx/query-191255b9e55c9b36d0fd047e56e515d121964d3481e48aae3558b53a5123ce7d.json b/.sqlx/query-191255b9e55c9b36d0fd047e56e515d121964d3481e48aae3558b53a5123ce7d.json
new file mode 100644
index 0000000..fe443f9
--- /dev/null
+++ b/.sqlx/query-191255b9e55c9b36d0fd047e56e515d121964d3481e48aae3558b53a5123ce7d.json
@@ -0,0 +1,50 @@
+{
+ "db_name": "SQLite",
+ "query": "\n select\n channel as \"channel: channel::Id\",\n sender as \"sender: login::Id\",\n id as \"id: Id\",\n body,\n sent_at as \"sent_at: DateTime\",\n sent_sequence as \"sent_sequence: Sequence\"\n from message\n where coalesce(sent_sequence <= $2, true)\n order by sent_sequence\n ",
+ "describe": {
+ "columns": [
+ {
+ "name": "channel: channel::Id",
+ "ordinal": 0,
+ "type_info": "Text"
+ },
+ {
+ "name": "sender: login::Id",
+ "ordinal": 1,
+ "type_info": "Text"
+ },
+ {
+ "name": "id: Id",
+ "ordinal": 2,
+ "type_info": "Text"
+ },
+ {
+ "name": "body",
+ "ordinal": 3,
+ "type_info": "Text"
+ },
+ {
+ "name": "sent_at: DateTime",
+ "ordinal": 4,
+ "type_info": "Text"
+ },
+ {
+ "name": "sent_sequence: Sequence",
+ "ordinal": 5,
+ "type_info": "Integer"
+ }
+ ],
+ "parameters": {
+ "Right": 1
+ },
+ "nullable": [
+ false,
+ false,
+ false,
+ false,
+ false,
+ false
+ ]
+ },
+ "hash": "191255b9e55c9b36d0fd047e56e515d121964d3481e48aae3558b53a5123ce7d"
+}
diff --git a/.sqlx/query-3fbec32aeb32c49e088f246c00151035dcf174cec137326b63e0cb0e4ae5cb60.json b/.sqlx/query-3fbec32aeb32c49e088f246c00151035dcf174cec137326b63e0cb0e4ae5cb60.json
new file mode 100644
index 0000000..01e72b2
--- /dev/null
+++ b/.sqlx/query-3fbec32aeb32c49e088f246c00151035dcf174cec137326b63e0cb0e4ae5cb60.json
@@ -0,0 +1,38 @@
+{
+ "db_name": "SQLite",
+ "query": "\n insert\n into login (id, name, password_hash, created_sequence, created_at)\n values ($1, $2, $3, $4, $5)\n returning\n id as \"id: Id\",\n name,\n created_sequence as \"created_sequence: Sequence\",\n created_at as \"created_at: DateTime\"\n ",
+ "describe": {
+ "columns": [
+ {
+ "name": "id: Id",
+ "ordinal": 0,
+ "type_info": "Text"
+ },
+ {
+ "name": "name",
+ "ordinal": 1,
+ "type_info": "Text"
+ },
+ {
+ "name": "created_sequence: Sequence",
+ "ordinal": 2,
+ "type_info": "Integer"
+ },
+ {
+ "name": "created_at: DateTime",
+ "ordinal": 3,
+ "type_info": "Text"
+ }
+ ],
+ "parameters": {
+ "Right": 5
+ },
+ "nullable": [
+ false,
+ false,
+ false,
+ false
+ ]
+ },
+ "hash": "3fbec32aeb32c49e088f246c00151035dcf174cec137326b63e0cb0e4ae5cb60"
+}
diff --git a/.sqlx/query-4224d5c1c4009e0d31b96bc7b1d9f6a2215c7c135720c1222170a1f6692c3a8a.json b/.sqlx/query-4224d5c1c4009e0d31b96bc7b1d9f6a2215c7c135720c1222170a1f6692c3a8a.json
new file mode 100644
index 0000000..767c217
--- /dev/null
+++ b/.sqlx/query-4224d5c1c4009e0d31b96bc7b1d9f6a2215c7c135720c1222170a1f6692c3a8a.json
@@ -0,0 +1,38 @@
+{
+ "db_name": "SQLite",
+ "query": "\n select\n id as \"id: Id\",\n name,\n created_sequence as \"created_sequence: Sequence\",\n created_at as \"created_at: DateTime\"\n from login\n where coalesce(login.created_sequence > $1, true)\n ",
+ "describe": {
+ "columns": [
+ {
+ "name": "id: Id",
+ "ordinal": 0,
+ "type_info": "Text"
+ },
+ {
+ "name": "name",
+ "ordinal": 1,
+ "type_info": "Text"
+ },
+ {
+ "name": "created_sequence: Sequence",
+ "ordinal": 2,
+ "type_info": "Integer"
+ },
+ {
+ "name": "created_at: DateTime",
+ "ordinal": 3,
+ "type_info": "Text"
+ }
+ ],
+ "parameters": {
+ "Right": 1
+ },
+ "nullable": [
+ false,
+ false,
+ false,
+ false
+ ]
+ },
+ "hash": "4224d5c1c4009e0d31b96bc7b1d9f6a2215c7c135720c1222170a1f6692c3a8a"
+}
diff --git a/.sqlx/query-45449846ea98e892c6e58f2ba8e082c21c9e3d124e5340dec036edd341d94e0f.json b/.sqlx/query-45449846ea98e892c6e58f2ba8e082c21c9e3d124e5340dec036edd341d94e0f.json
deleted file mode 100644
index 2974cb0..0000000
--- a/.sqlx/query-45449846ea98e892c6e58f2ba8e082c21c9e3d124e5340dec036edd341d94e0f.json
+++ /dev/null
@@ -1,26 +0,0 @@
-{
- "db_name": "SQLite",
- "query": "\n\t\t\t\tinsert into message\n\t\t\t\t\t(id, channel, sender, sent_at, sent_sequence, body)\n\t\t\t\tvalues ($1, $2, $3, $4, $5, $6)\n\t\t\t\treturning\n\t\t\t\t\tid as \"id: Id\",\n\t\t\t\t\tbody\n\t\t\t",
- "describe": {
- "columns": [
- {
- "name": "id: Id",
- "ordinal": 0,
- "type_info": "Text"
- },
- {
- "name": "body",
- "ordinal": 1,
- "type_info": "Text"
- }
- ],
- "parameters": {
- "Right": 6
- },
- "nullable": [
- false,
- false
- ]
- },
- "hash": "45449846ea98e892c6e58f2ba8e082c21c9e3d124e5340dec036edd341d94e0f"
-}
diff --git a/.sqlx/query-52aa7910b81c18c06673f57b7a8e81f7f317427b0c29b29d142bb50e4ef3c117.json b/.sqlx/query-52aa7910b81c18c06673f57b7a8e81f7f317427b0c29b29d142bb50e4ef3c117.json
deleted file mode 100644
index 83a6c90..0000000
--- a/.sqlx/query-52aa7910b81c18c06673f57b7a8e81f7f317427b0c29b29d142bb50e4ef3c117.json
+++ /dev/null
@@ -1,26 +0,0 @@
-{
- "db_name": "SQLite",
- "query": "\n insert or fail\n into login (id, name, password_hash)\n values ($1, $2, $3)\n returning\n id as \"id: Id\",\n name\n ",
- "describe": {
- "columns": [
- {
- "name": "id: Id",
- "ordinal": 0,
- "type_info": "Text"
- },
- {
- "name": "name",
- "ordinal": 1,
- "type_info": "Text"
- }
- ],
- "parameters": {
- "Right": 3
- },
- "nullable": [
- false,
- false
- ]
- },
- "hash": "52aa7910b81c18c06673f57b7a8e81f7f317427b0c29b29d142bb50e4ef3c117"
-}
diff --git a/.sqlx/query-5c53579fa431b6e184faf94eedb8229360ba78f2607d25e7e2ee5db5c759a5a3.json b/.sqlx/query-5c53579fa431b6e184faf94eedb8229360ba78f2607d25e7e2ee5db5c759a5a3.json
deleted file mode 100644
index 4ca6786..0000000
--- a/.sqlx/query-5c53579fa431b6e184faf94eedb8229360ba78f2607d25e7e2ee5db5c759a5a3.json
+++ /dev/null
@@ -1,62 +0,0 @@
-{
- "db_name": "SQLite",
- "query": "\n select\n channel.id as \"channel_id: channel::Id\",\n channel.name as \"channel_name\",\n sender.id as \"sender_id: login::Id\",\n sender.name as \"sender_name\",\n message.id as \"id: Id\",\n message.body,\n sent_at as \"sent_at: DateTime\",\n sent_sequence as \"sent_sequence: Sequence\"\n from message\n join channel on message.channel = channel.id\n join login as sender on message.sender = sender.id\n where coalesce(message.sent_sequence > $1, true)\n ",
- "describe": {
- "columns": [
- {
- "name": "channel_id: channel::Id",
- "ordinal": 0,
- "type_info": "Text"
- },
- {
- "name": "channel_name",
- "ordinal": 1,
- "type_info": "Text"
- },
- {
- "name": "sender_id: login::Id",
- "ordinal": 2,
- "type_info": "Text"
- },
- {
- "name": "sender_name",
- "ordinal": 3,
- "type_info": "Text"
- },
- {
- "name": "id: Id",
- "ordinal": 4,
- "type_info": "Text"
- },
- {
- "name": "body",
- "ordinal": 5,
- "type_info": "Text"
- },
- {
- "name": "sent_at: DateTime",
- "ordinal": 6,
- "type_info": "Text"
- },
- {
- "name": "sent_sequence: Sequence",
- "ordinal": 7,
- "type_info": "Integer"
- }
- ],
- "parameters": {
- "Right": 1
- },
- "nullable": [
- false,
- false,
- false,
- false,
- false,
- false,
- false,
- false
- ]
- },
- "hash": "5c53579fa431b6e184faf94eedb8229360ba78f2607d25e7e2ee5db5c759a5a3"
-}
diff --git a/.sqlx/query-6fc4be85527af518da17c49cedc6bee1750d28a6176980ed7040b8a3301fc7e5.json b/.sqlx/query-6fc4be85527af518da17c49cedc6bee1750d28a6176980ed7040b8a3301fc7e5.json
deleted file mode 100644
index 257e1f6..0000000
--- a/.sqlx/query-6fc4be85527af518da17c49cedc6bee1750d28a6176980ed7040b8a3301fc7e5.json
+++ /dev/null
@@ -1,62 +0,0 @@
-{
- "db_name": "SQLite",
- "query": "\n select\n channel.id as \"channel_id: channel::Id\",\n channel.name as \"channel_name\",\n sender.id as \"sender_id: login::Id\",\n sender.name as \"sender_name\",\n message.id as \"id: Id\",\n message.body,\n sent_at as \"sent_at: DateTime\",\n sent_sequence as \"sent_sequence: Sequence\"\n from message\n join channel on message.channel = channel.id\n join login as sender on message.sender = sender.id\n where message.id = $1\n ",
- "describe": {
- "columns": [
- {
- "name": "channel_id: channel::Id",
- "ordinal": 0,
- "type_info": "Text"
- },
- {
- "name": "channel_name",
- "ordinal": 1,
- "type_info": "Text"
- },
- {
- "name": "sender_id: login::Id",
- "ordinal": 2,
- "type_info": "Text"
- },
- {
- "name": "sender_name",
- "ordinal": 3,
- "type_info": "Text"
- },
- {
- "name": "id: Id",
- "ordinal": 4,
- "type_info": "Text"
- },
- {
- "name": "body",
- "ordinal": 5,
- "type_info": "Text"
- },
- {
- "name": "sent_at: DateTime",
- "ordinal": 6,
- "type_info": "Text"
- },
- {
- "name": "sent_sequence: Sequence",
- "ordinal": 7,
- "type_info": "Integer"
- }
- ],
- "parameters": {
- "Right": 1
- },
- "nullable": [
- false,
- false,
- false,
- false,
- false,
- false,
- false,
- false
- ]
- },
- "hash": "6fc4be85527af518da17c49cedc6bee1750d28a6176980ed7040b8a3301fc7e5"
-}
diff --git a/.sqlx/query-836a37fa3fbcacbc880c7c9e603b3087a17906db95d14a35034889a253f23418.json b/.sqlx/query-836a37fa3fbcacbc880c7c9e603b3087a17906db95d14a35034889a253f23418.json
new file mode 100644
index 0000000..2ac20b6
--- /dev/null
+++ b/.sqlx/query-836a37fa3fbcacbc880c7c9e603b3087a17906db95d14a35034889a253f23418.json
@@ -0,0 +1,50 @@
+{
+ "db_name": "SQLite",
+ "query": "\n\t\t\t\tinsert into message\n\t\t\t\t\t(id, channel, sender, sent_at, sent_sequence, body)\n\t\t\t\tvalues ($1, $2, $3, $4, $5, $6)\n\t\t\t\treturning\n\t\t\t\t\tid as \"id: Id\",\n channel as \"channel: channel::Id\",\n sender as \"sender: login::Id\",\n sent_at as \"sent_at: DateTime\",\n sent_sequence as \"sent_sequence: Sequence\",\n\t\t\t\t\tbody\n\t\t\t",
+ "describe": {
+ "columns": [
+ {
+ "name": "id: Id",
+ "ordinal": 0,
+ "type_info": "Text"
+ },
+ {
+ "name": "channel: channel::Id",
+ "ordinal": 1,
+ "type_info": "Text"
+ },
+ {
+ "name": "sender: login::Id",
+ "ordinal": 2,
+ "type_info": "Text"
+ },
+ {
+ "name": "sent_at: DateTime",
+ "ordinal": 3,
+ "type_info": "Text"
+ },
+ {
+ "name": "sent_sequence: Sequence",
+ "ordinal": 4,
+ "type_info": "Integer"
+ },
+ {
+ "name": "body",
+ "ordinal": 5,
+ "type_info": "Text"
+ }
+ ],
+ "parameters": {
+ "Right": 6
+ },
+ "nullable": [
+ false,
+ false,
+ false,
+ false,
+ false,
+ false
+ ]
+ },
+ "hash": "836a37fa3fbcacbc880c7c9e603b3087a17906db95d14a35034889a253f23418"
+}
diff --git a/.sqlx/query-9606853f2ea9f776f7e4384a2137be57b3a45fe38a675262ceaaebb3d346a9ca.json b/.sqlx/query-9606853f2ea9f776f7e4384a2137be57b3a45fe38a675262ceaaebb3d346a9ca.json
deleted file mode 100644
index 82246ac..0000000
--- a/.sqlx/query-9606853f2ea9f776f7e4384a2137be57b3a45fe38a675262ceaaebb3d346a9ca.json
+++ /dev/null
@@ -1,62 +0,0 @@
-{
- "db_name": "SQLite",
- "query": "\n select\n channel.id as \"channel_id: channel::Id\",\n channel.name as \"channel_name\",\n sender.id as \"sender_id: login::Id\",\n sender.name as \"sender_name\",\n message.id as \"id: Id\",\n message.body,\n sent_at as \"sent_at: DateTime\",\n sent_sequence as \"sent_sequence: Sequence\"\n from message\n join channel on message.channel = channel.id\n join login as sender on message.sender = sender.id\n where channel.id = $1\n and coalesce(message.sent_sequence <= $2, true)\n order by message.sent_sequence\n ",
- "describe": {
- "columns": [
- {
- "name": "channel_id: channel::Id",
- "ordinal": 0,
- "type_info": "Text"
- },
- {
- "name": "channel_name",
- "ordinal": 1,
- "type_info": "Text"
- },
- {
- "name": "sender_id: login::Id",
- "ordinal": 2,
- "type_info": "Text"
- },
- {
- "name": "sender_name",
- "ordinal": 3,
- "type_info": "Text"
- },
- {
- "name": "id: Id",
- "ordinal": 4,
- "type_info": "Text"
- },
- {
- "name": "body",
- "ordinal": 5,
- "type_info": "Text"
- },
- {
- "name": "sent_at: DateTime",
- "ordinal": 6,
- "type_info": "Text"
- },
- {
- "name": "sent_sequence: Sequence",
- "ordinal": 7,
- "type_info": "Integer"
- }
- ],
- "parameters": {
- "Right": 2
- },
- "nullable": [
- false,
- false,
- false,
- false,
- false,
- false,
- false,
- false
- ]
- },
- "hash": "9606853f2ea9f776f7e4384a2137be57b3a45fe38a675262ceaaebb3d346a9ca"
-}
diff --git a/.sqlx/query-9f83f0365b174ce61adb6d48f3b75adcb3ddf8a88439aa8ddb0c66be2f0b384e.json b/.sqlx/query-9f83f0365b174ce61adb6d48f3b75adcb3ddf8a88439aa8ddb0c66be2f0b384e.json
new file mode 100644
index 0000000..c8ce115
--- /dev/null
+++ b/.sqlx/query-9f83f0365b174ce61adb6d48f3b75adcb3ddf8a88439aa8ddb0c66be2f0b384e.json
@@ -0,0 +1,50 @@
+{
+ "db_name": "SQLite",
+ "query": "\n select\n channel as \"channel: channel::Id\",\n sender as \"sender: login::Id\",\n id as \"id: Id\",\n body,\n sent_at as \"sent_at: DateTime\",\n sent_sequence as \"sent_sequence: Sequence\"\n from message\n where id = $1\n ",
+ "describe": {
+ "columns": [
+ {
+ "name": "channel: channel::Id",
+ "ordinal": 0,
+ "type_info": "Text"
+ },
+ {
+ "name": "sender: login::Id",
+ "ordinal": 1,
+ "type_info": "Text"
+ },
+ {
+ "name": "id: Id",
+ "ordinal": 2,
+ "type_info": "Text"
+ },
+ {
+ "name": "body",
+ "ordinal": 3,
+ "type_info": "Text"
+ },
+ {
+ "name": "sent_at: DateTime",
+ "ordinal": 4,
+ "type_info": "Text"
+ },
+ {
+ "name": "sent_sequence: Sequence",
+ "ordinal": 5,
+ "type_info": "Integer"
+ }
+ ],
+ "parameters": {
+ "Right": 1
+ },
+ "nullable": [
+ false,
+ false,
+ false,
+ false,
+ false,
+ false
+ ]
+ },
+ "hash": "9f83f0365b174ce61adb6d48f3b75adcb3ddf8a88439aa8ddb0c66be2f0b384e"
+}
diff --git a/.sqlx/query-d3938e57aaf0ef5f9f9300eed9467f4a01053e8abbdd0610a3eafb623fd355c9.json b/.sqlx/query-ae9d1c997891f7e917cde554a251e76e93d3a43d1447faa4ec4085f7b3f60404.json
index 3d15b5c..cb345dc 100644
--- a/.sqlx/query-d3938e57aaf0ef5f9f9300eed9467f4a01053e8abbdd0610a3eafb623fd355c9.json
+++ b/.sqlx/query-ae9d1c997891f7e917cde554a251e76e93d3a43d1447faa4ec4085f7b3f60404.json
@@ -1,6 +1,6 @@
{
"db_name": "SQLite",
- "query": "\n select\n token.id as \"token_id: Id\",\n login.id as \"login_id: login::Id\",\n name as \"login_name\"\n from login\n join token on login.id = token.login\n where token.secret = $1\n ",
+ "query": "\n select\n token.id as \"token_id: Id\",\n login.id as \"login_id: login::Id\",\n login.name as \"login_name\"\n from login\n join token on login.id = token.login\n where token.secret = $1\n ",
"describe": {
"columns": [
{
@@ -28,5 +28,5 @@
false
]
},
- "hash": "d3938e57aaf0ef5f9f9300eed9467f4a01053e8abbdd0610a3eafb623fd355c9"
+ "hash": "ae9d1c997891f7e917cde554a251e76e93d3a43d1447faa4ec4085f7b3f60404"
}
diff --git a/.sqlx/query-afe4404b44bd8ce6c9a51aafabe3524caa9882d0cba0dc4359eb62cf8154c92d.json b/.sqlx/query-afe4404b44bd8ce6c9a51aafabe3524caa9882d0cba0dc4359eb62cf8154c92d.json
new file mode 100644
index 0000000..cc8a074
--- /dev/null
+++ b/.sqlx/query-afe4404b44bd8ce6c9a51aafabe3524caa9882d0cba0dc4359eb62cf8154c92d.json
@@ -0,0 +1,50 @@
+{
+ "db_name": "SQLite",
+ "query": "\n select\n channel as \"channel: channel::Id\",\n sender as \"sender: login::Id\",\n id as \"id: Id\",\n body,\n sent_at as \"sent_at: DateTime\",\n sent_sequence as \"sent_sequence: Sequence\"\n from message\n where channel = $1\n and coalesce(sent_sequence <= $2, true)\n order by sent_sequence\n ",
+ "describe": {
+ "columns": [
+ {
+ "name": "channel: channel::Id",
+ "ordinal": 0,
+ "type_info": "Text"
+ },
+ {
+ "name": "sender: login::Id",
+ "ordinal": 1,
+ "type_info": "Text"
+ },
+ {
+ "name": "id: Id",
+ "ordinal": 2,
+ "type_info": "Text"
+ },
+ {
+ "name": "body",
+ "ordinal": 3,
+ "type_info": "Text"
+ },
+ {
+ "name": "sent_at: DateTime",
+ "ordinal": 4,
+ "type_info": "Text"
+ },
+ {
+ "name": "sent_sequence: Sequence",
+ "ordinal": 5,
+ "type_info": "Integer"
+ }
+ ],
+ "parameters": {
+ "Right": 2
+ },
+ "nullable": [
+ false,
+ false,
+ false,
+ false,
+ false,
+ false
+ ]
+ },
+ "hash": "afe4404b44bd8ce6c9a51aafabe3524caa9882d0cba0dc4359eb62cf8154c92d"
+}
diff --git a/.sqlx/query-b09438f4b1247a4e3991751de1fa77f2a18537d30e61ccbdcc947d0dba2b3da3.json b/.sqlx/query-b09438f4b1247a4e3991751de1fa77f2a18537d30e61ccbdcc947d0dba2b3da3.json
new file mode 100644
index 0000000..7c83aa1
--- /dev/null
+++ b/.sqlx/query-b09438f4b1247a4e3991751de1fa77f2a18537d30e61ccbdcc947d0dba2b3da3.json
@@ -0,0 +1,38 @@
+{
+ "db_name": "SQLite",
+ "query": "\n select\n id as \"id: Id\",\n name,\n created_sequence as \"created_sequence: Sequence\",\n created_at as \"created_at: DateTime\"\n from login\n where coalesce(created_sequence <= $1, true)\n order by created_sequence\n ",
+ "describe": {
+ "columns": [
+ {
+ "name": "id: Id",
+ "ordinal": 0,
+ "type_info": "Text"
+ },
+ {
+ "name": "name",
+ "ordinal": 1,
+ "type_info": "Text"
+ },
+ {
+ "name": "created_sequence: Sequence",
+ "ordinal": 2,
+ "type_info": "Integer"
+ },
+ {
+ "name": "created_at: DateTime",
+ "ordinal": 3,
+ "type_info": "Text"
+ }
+ ],
+ "parameters": {
+ "Right": 1
+ },
+ "nullable": [
+ false,
+ false,
+ false,
+ false
+ ]
+ },
+ "hash": "b09438f4b1247a4e3991751de1fa77f2a18537d30e61ccbdcc947d0dba2b3da3"
+}
diff --git a/.sqlx/query-a0c6dc71c26f5d1e64cb18de6303288ea5b7cb44e26af87a26c69cc203667123.json b/.sqlx/query-b991b34b491306780a1b6efa157b6ee50f32e1136ad9cbd91caa0add2ab3cdaa.json
index 58c9c94..3901207 100644
--- a/.sqlx/query-a0c6dc71c26f5d1e64cb18de6303288ea5b7cb44e26af87a26c69cc203667123.json
+++ b/.sqlx/query-b991b34b491306780a1b6efa157b6ee50f32e1136ad9cbd91caa0add2ab3cdaa.json
@@ -1,6 +1,6 @@
{
"db_name": "SQLite",
- "query": "\n\t\t\t\tselect\n\t\t\t\t\tid as \"id: login::Id\",\n\t\t\t\t\tname,\n\t\t\t\t\tpassword_hash as \"password_hash: StoredHash\"\n\t\t\t\tfrom login\n\t\t\t\twhere name = $1\n\t\t\t",
+ "query": "\n\t\t\t\tselect\n\t\t\t\t\tid as \"id: login::Id\",\n\t\t\t\t\tname,\n\t\t\t\t\tpassword_hash as \"password_hash: StoredHash\",\n created_sequence as \"created_sequence: Sequence\",\n created_at as \"created_at: DateTime\"\n\t\t\t\tfrom login\n\t\t\t\twhere name = $1\n\t\t\t",
"describe": {
"columns": [
{
@@ -17,6 +17,16 @@
"name": "password_hash: StoredHash",
"ordinal": 2,
"type_info": "Text"
+ },
+ {
+ "name": "created_sequence: Sequence",
+ "ordinal": 3,
+ "type_info": "Integer"
+ },
+ {
+ "name": "created_at: DateTime",
+ "ordinal": 4,
+ "type_info": "Text"
}
],
"parameters": {
@@ -25,8 +35,10 @@
"nullable": [
false,
false,
+ false,
+ false,
false
]
},
- "hash": "a0c6dc71c26f5d1e64cb18de6303288ea5b7cb44e26af87a26c69cc203667123"
+ "hash": "b991b34b491306780a1b6efa157b6ee50f32e1136ad9cbd91caa0add2ab3cdaa"
}
diff --git a/.sqlx/query-e476c3fb3f2227b7421ad2075d18b090751a345ac2e90bb8c116e0fbb5dfbb99.json b/.sqlx/query-e476c3fb3f2227b7421ad2075d18b090751a345ac2e90bb8c116e0fbb5dfbb99.json
new file mode 100644
index 0000000..821bd8f
--- /dev/null
+++ b/.sqlx/query-e476c3fb3f2227b7421ad2075d18b090751a345ac2e90bb8c116e0fbb5dfbb99.json
@@ -0,0 +1,50 @@
+{
+ "db_name": "SQLite",
+ "query": "\n select\n channel as \"channel: channel::Id\",\n sender as \"sender: login::Id\",\n id as \"id: Id\",\n body,\n sent_at as \"sent_at: DateTime\",\n sent_sequence as \"sent_sequence: Sequence\"\n from message\n where coalesce(message.sent_sequence > $1, true)\n ",
+ "describe": {
+ "columns": [
+ {
+ "name": "channel: channel::Id",
+ "ordinal": 0,
+ "type_info": "Text"
+ },
+ {
+ "name": "sender: login::Id",
+ "ordinal": 1,
+ "type_info": "Text"
+ },
+ {
+ "name": "id: Id",
+ "ordinal": 2,
+ "type_info": "Text"
+ },
+ {
+ "name": "body",
+ "ordinal": 3,
+ "type_info": "Text"
+ },
+ {
+ "name": "sent_at: DateTime",
+ "ordinal": 4,
+ "type_info": "Text"
+ },
+ {
+ "name": "sent_sequence: Sequence",
+ "ordinal": 5,
+ "type_info": "Integer"
+ }
+ ],
+ "parameters": {
+ "Right": 1
+ },
+ "nullable": [
+ false,
+ false,
+ false,
+ false,
+ false,
+ false
+ ]
+ },
+ "hash": "e476c3fb3f2227b7421ad2075d18b090751a345ac2e90bb8c116e0fbb5dfbb99"
+}
diff --git a/docs/api.md b/docs/api.md
index 91485f3..7414ccf 100644
--- a/docs/api.md
+++ b/docs/api.md
@@ -25,23 +25,25 @@ Returns information needed to boot the client. Also the recommended way to check
"id": "L1234abcd",
},
"resume_point": "1312",
+ "logins": [
+ {
+ "id": "L1234abcd",
+ "name": "example username"
+ }
+ ],
"channels": [
{
"name": "nonsense and such",
"id": "C1234abcd",
- "messages": [
- {
- "at": "2024-09-27T23:19:10.208147Z",
- "sender": {
- "id": "L1234abcd",
- "name": "example username"
- },
- "message": {
- "id": "M1312acab",
- "body": "beep"
- }
- }
- ]
+ }
+ ],
+ "messages": [
+ {
+ "at": "2024-09-27T23:19:10.208147Z",
+ "channel": "C1234abcd",
+ "sender": "L1234abcd",
+ "id": "M1312acab",
+ "body": "beep"
}
]
}
@@ -200,50 +202,49 @@ The event IDs `hi` sends in `application/event-stream` encoding are ephemeral, a
The returned event stream is a sequence of events:
```json
+id: 1232
+data: {
+data: "type": "login",
+data: "event": "created",
+data: "at": "2024-09-27T23:17:10.208147Z",
+data: "id": "L1234abcd",
+data: "name": "example username"
+data: }
+
id: 1233
data: {
-data: "type": "created",
+data: "type": "channel",
+data: "event": "created",
data: "at": "2024-09-27T23:18:10.208147Z",
-data: "channel": {
-data: "id": "C9876cyyz",
-data: "name": "example channel 2"
-data: }
+data: "id": "C9876cyyz",
+data: "name": "example channel 2"
data: }
id: 1234
data: {
data: "type": "message",
+data: "event": "sent",
data: "at": "2024-09-27T23:19:10.208147Z",
-data: "channel": {
-data: "id": "C9876cyyz",
-data: "name": "example channel 2"
-data: },
-data: "sender": {
-data: "id": "L1234abcd",
-data: "name": "example username"
-data: },
-data: "message": {
-data: "id": "M1312acab",
-data: "body": "beep"
-data: }
+data: "channel": "C9876cyyz",
+data: "sender": "L1234abcd",
+data: "id": "M1312acab",
+data: "body": "beep"
data: }
id: 1235
data: {
+data: "type": "message",
+data: "event": "deleted",
data: "at": "2024-09-28T02:44:27.077355Z",
-data: "channel": {
-data: "id": "C9876cyyz",
-data: "name": "example channel 2"
-data: },
-data: "type": "message_deleted",
-data: "message": "M1312acab"
+data: "id": "M1312acab"
data: }
id: 1236
data: {
+data: "type": "channel",
+data: "event": "deleted",
data: "at": "2024-09-28T03:40:25.384318Z",
-data: "type": "deleted",
-data: "channel": "C9876cyyz"
+data: "id": "C9876cyyz"
data: }
```
diff --git a/hi-ui/src/apiServer.js b/hi-ui/src/apiServer.js
index f4a89a4..3aa3f1b 100644
--- a/hi-ui/src/apiServer.js
+++ b/hi-ui/src/apiServer.js
@@ -1,5 +1,5 @@
import axios from 'axios';
-import { activeChannel, channelsList, messages } from './store';
+import { activeChannel, channelsList, loginsList, messages } from './store';
export const apiServer = axios.create({
baseURL: '/api/',
@@ -55,22 +55,47 @@ export function subscribeToEvents(resume_point) {
const data = JSON.parse(evt.data);
switch (data.type) {
- case 'created':
- channelsList.update((value) => value.addChannel(data.channel))
+ case 'login':
+ onLoginEvent(data);
break;
- case 'message':
- messages.update((value) => value.addMessage(data));
- break;
- case 'message_deleted':
- messages.update((value) => value.deleteMessage(data.channel.id, data.message));
+ case 'channel':
+ onChannelEvent(data);
break;
- case 'deleted':
- activeChannel.update((value) => value.deleteChannel(data.channel));
- channelsList.update((value) => value.deleteChannel(data.channel));
- messages.update((value) => value.deleteChannel(data.channel));
- break;
- default:
+ case 'message':
+ onMessageEvent(data);
break;
}
}
}
+
+function onLoginEvent(data) {
+ switch (data.event) {
+ case 'created':
+ logins.update((value) => value.addLogin(data.id, data.name))
+ break;
+ }
+}
+
+function onChannelEvent(data) {
+ switch (data.event) {
+ case 'created':
+ channelsList.update((value) => value.addChannel(data.id, data.name))
+ break;
+ case 'deleted':
+ activeChannel.update((value) => value.deleteChannel(data.id));
+ channelsList.update((value) => value.deleteChannel(data.id));
+ messages.update((value) => value.deleteChannel(data.id));
+ break;
+ }
+}
+
+function onMessageEvent(data) {
+ switch (data.event) {
+ case 'sent':
+ messages.update((value) => value.addMessage(data.channel, data.id, data.at, data.sender, data.body));
+ break;
+ case 'deleted':
+ messages.update((value) => value.deleteMessage(data.id));
+ break;
+ }
+}
diff --git a/hi-ui/src/lib/Message.svelte b/hi-ui/src/lib/Message.svelte
index efc6641..d10bee3 100644
--- a/hi-ui/src/lib/Message.svelte
+++ b/hi-ui/src/lib/Message.svelte
@@ -1,23 +1,25 @@
<script>
import SvelteMarkdown from 'svelte-markdown';
- import { currentUser } from '../store';
+ import { currentUser, logins } from '../store';
import { deleteMessage } from '../apiServer';
export let at; // XXX: Omitted for now.
export let sender;
- export let message;
+ export let body;
let timestamp = new Date(at).toTimeString();
+ let name;
+ $: name = $logins.get(sender);
</script>
<div class="card card-hover m-4 relative">
<span class="chip variant-soft sticky top-o left-0">
<!-- TODO: should this show up for only the first of a run? -->
- @{sender.name}:
+ @{name}:
</span>
<span class="timestamp chip variant-soft absolute top-0 right-0">{at}</span>
<section class="p-4">
- <SvelteMarkdown source={message.body} />
+ <SvelteMarkdown source={body} />
</section>
</div>
diff --git a/hi-ui/src/routes/+page.svelte b/hi-ui/src/routes/+page.svelte
index 932582d..dd5f2f7 100644
--- a/hi-ui/src/routes/+page.svelte
+++ b/hi-ui/src/routes/+page.svelte
@@ -2,7 +2,7 @@
import { onMount } from 'svelte';
import { boot, subscribeToEvents } from '../apiServer';
- import { currentUser, channelsList, messages } from '../store';
+ import { currentUser, logins, channelsList, messages } from '../store';
import ActiveChannel from '../lib/ActiveChannel.svelte';
import ChannelList from '../lib/ChannelList.svelte';
@@ -22,15 +22,9 @@
id: boot.login.id,
username: boot.login.name,
}));
- let channels = boot.channels.map((channel) => ({
- id: channel.id,
- name: channel.name,
- }));
- channelsList.update((value) => value.setChannels(channels));
- let bootMessages = boot.channels.map((channel) => [channel.id, channel.messages]);
- for (let [channel, channelMessages] of bootMessages) {
- messages.update((value) => value.addMessages(channel, channelMessages));
- }
+ logins.update((value) => value.setLogins(boot.logins));
+ channelsList.update((value) => value.setChannels(boot.channels));
+ messages.update((value) => value.setMessages(boot.messages));
}
onMount(async () => {
@@ -38,6 +32,7 @@
let response = await boot();
switch (response.status) {
case 200:
+ debugger;
onBooted(response.data);
subscribeToEvents(response.data.resume_point);
break;
diff --git a/hi-ui/src/store.js b/hi-ui/src/store.js
index 4e6b4f1..1b3dfca 100644
--- a/hi-ui/src/store.js
+++ b/hi-ui/src/store.js
@@ -1,8 +1,10 @@
import { writable } from 'svelte/store';
import { ActiveChannel, Channels } from './store/channels';
import { Messages } from './store/messages';
+import { Logins } from './store/logins';
export const currentUser = writable(null);
export const activeChannel = writable(new ActiveChannel());
+export const logins = writable(new Logins());
export const channelsList = writable(new Channels());
export const messages = writable(new Messages());
diff --git a/hi-ui/src/store/channels.js b/hi-ui/src/store/channels.js
index 20702cc..bb6c86c 100644
--- a/hi-ui/src/store/channels.js
+++ b/hi-ui/src/store/channels.js
@@ -9,8 +9,8 @@ export class Channels {
return this;
}
- addChannel(channel) {
- this.channels = [...this.channels, channel];
+ addChannel(id, name) {
+ this.channels = [...this.channels, { id, name }];
this.sort();
return this;
}
diff --git a/hi-ui/src/store/logins.js b/hi-ui/src/store/logins.js
new file mode 100644
index 0000000..5b45206
--- /dev/null
+++ b/hi-ui/src/store/logins.js
@@ -0,0 +1,22 @@
+export class Logins {
+ constructor() {
+ this.logins = {};
+ }
+
+ addLogin(id, name) {
+ this.logins[id] = name;
+ return this;
+ }
+
+ setLogins(logins) {
+ this.logins = {};
+ for (let { id, name } of logins) {
+ this.addLogin(id, name);
+ }
+ return this;
+ }
+
+ get(id) {
+ return this.logins[id];
+ }
+}
diff --git a/hi-ui/src/store/messages.js b/hi-ui/src/store/messages.js
index 560b9e1..931b8fb 100644
--- a/hi-ui/src/store/messages.js
+++ b/hi-ui/src/store/messages.js
@@ -4,28 +4,31 @@ export class Messages {
}
inChannel(channel) {
- return this.channels[channel] || [];
+ return this.channels[channel] = (this.channels[channel] || []);
}
- addMessage(message) {
- let {
- channel,
- ...payload
- } = message;
- let channel_id = channel.id;
- this.updateChannel(channel_id, (messages) => [...messages, payload]);
+ addMessage(channel, id, at, sender, body) {
+ this.updateChannel(channel, (messages) => [...messages, { id, at, sender, body }]);
return this;
}
- addMessages(channel, payloads) {
- this.updateChannel(channel, (messages) => [...messages, ...payloads]);
+ setMessages(messages) {
+ this.channels = {};
+ for (let { channel, id, at, sender, body } of messages) {
+ this.inChannel(channel).push({ id, at, sender, body, });
+ }
+ for (let channel in this.channels) {
+ this.channels[channel].sort((a, b) => a.at - b.at);
+ }
return this;
}
- deleteMessage(channel, message) {
- let messages = this.messages(channel).filter((msg) => msg.message.id != message);
- this.channels[channel] = messages;
+ deleteMessage(message) {
+ for (let channel in this.channels) {
+ this.updateChannel(channel, (messages) => messages.filter((msg) => msg.id != message));
+ }
+ return this;
}
deleteChannel(id) {
diff --git a/migrations/20241009031441_login_created_at.sql b/migrations/20241009031441_login_created_at.sql
new file mode 100644
index 0000000..001c48e
--- /dev/null
+++ b/migrations/20241009031441_login_created_at.sql
@@ -0,0 +1,217 @@
+-- figure out new event seqeuence.
+--
+-- The main problem here is ensuring that the resulting stream is _consistent_.
+-- For any message, the login it refers to should be created in the event stream
+-- before the message does, and the channel it refers to should likewise be
+-- created first.
+--
+-- Messages come after channels by time (clock slew allowing), so we can reuse
+-- those timestamps to recover a global ordering between channels and messages.
+--
+-- We synthesize a login's created_at from the earliest of:
+--
+-- * the earliest message they sent that we know of, or
+-- * right now (no messages in recorded history).
+--
+-- This produces a consistent sequence, up to clock slew, at the expense of
+-- renumbering every event.
+
+create table unsequenced (
+ at text
+ not null,
+ login text
+ references login (id),
+ channel text
+ references channel (id),
+ message text
+ references message (id),
+ check (
+ (login is not null and channel is null and message is null)
+ or (login is null and channel is not null and message is null)
+ or (login is null and channel is null and message is not null)
+ )
+);
+
+insert into unsequenced (at, login)
+select
+ coalesce (
+ min(message.sent_at),
+ strftime('%FT%R:%f+00:00', 'now', 'utc')
+ ),
+ login.id
+from login
+left join message
+ on login.id = message.sender;
+
+insert into unsequenced (at, channel)
+select created_at, id
+from channel;
+
+insert into unsequenced (at, message)
+select sent_at, id
+from message;
+
+create table event (
+ at text
+ not null,
+ sequence
+ primary key
+ not null,
+ login text
+ references login (id),
+ channel text
+ references channel (id),
+ message text
+ references message (id),
+ check (
+ (login is not null and channel is null and message is null)
+ or (login is null and channel is not null and message is null)
+ or (login is null and channel is null and message is not null)
+ )
+);
+
+insert into event (at, sequence, login, channel, message)
+select
+ at, row_number() over (order by at, login is null, message is null), login, channel, message
+from unsequenced;
+
+-- Get this out of memory.
+drop table unsequenced;
+
+-- Because of how foundational `login` is, we pretty much have to recreate the
+-- whole schema.
+alter table message
+rename to old_message;
+
+alter table channel
+rename to old_channel;
+
+alter table token
+rename to old_token;
+
+alter table login
+rename to old_login;
+
+create table login (
+ id text
+ not null
+ primary key,
+ name text
+ not null
+ unique,
+ password_hash text
+ not null,
+ created_sequence bigint
+ unique
+ not null,
+ created_at text
+ not null
+);
+
+create table token (
+ id text
+ not null
+ primary key,
+ secret text
+ not null
+ unique,
+ login text
+ not null
+ references login (id),
+ issued_at text
+ not null,
+ last_used_at text
+ not null
+);
+
+create table channel (
+ id text
+ not null
+ primary key,
+ name text
+ unique
+ not null,
+ created_sequence bigint
+ unique
+ not null,
+ created_at text
+ not null
+);
+
+create table message (
+ id text
+ not null
+ primary key,
+ channel text
+ not null
+ references channel (id),
+ sender text
+ not null
+ references login (id),
+ sent_sequence bigint
+ unique
+ not null,
+ sent_at text
+ not null,
+ body text
+ not null
+);
+
+-- Copy data from the original tables, assigning sequence numbers as we go.
+insert into login (id, name, password_hash, created_sequence, created_at)
+select
+ old.id,
+ old.name,
+ old.password_hash,
+ event.sequence,
+ event.at
+from old_login as old
+join event on old.id = event.login
+order by event.sequence;
+
+insert into token (id, secret, login, issued_at, last_used_at)
+select id, secret, login, issued_at, last_used_at from old_token;
+
+insert into channel (id, name, created_sequence, created_at)
+select
+ old.id,
+ old.name,
+ event.sequence,
+ old.created_at
+from old_channel as old
+join event on old.id = event.channel
+order by event.sequence;
+
+insert into message (id, channel, sender, sent_sequence, sent_at, body)
+select
+ old.id,
+ old.channel,
+ old.sender,
+ event.sequence,
+ old.sent_at,
+ old.body
+from old_message as old
+join event on old.id = event.message
+order by event.sequence;
+
+-- Restart the event sequence, using the highest sequence number in the new
+-- event series.
+update event_sequence
+set last_value = (select coalesce(max(sequence), 0) from event);
+
+-- Clean up the now-unused original tables, plus the resequencing temp table
+drop table event;
+drop table old_message;
+drop table old_channel;
+drop table old_token;
+drop table old_login;
+
+-- Reindex, now that the original indices are no longer in the way
+create index token_issued_at
+on token (issued_at);
+
+create index message_sent_at
+on message (sent_at);
+
+create index channel_created_at
+on channel (created_at);
diff --git a/src/app.rs b/src/app.rs
index 6d007a9..177c134 100644
--- a/src/app.rs
+++ b/src/app.rs
@@ -3,9 +3,9 @@ use sqlx::sqlite::SqlitePool;
use crate::{
boot::app::Boot,
channel::app::Channels,
- event::{app::Events, broadcaster::Broadcaster as EventBroadcaster},
+ event::{self, app::Events},
message::app::Messages,
- token::{app::Tokens, broadcaster::Broadcaster as TokenBroadcaster},
+ token::{self, app::Tokens},
};
#[cfg(test)]
@@ -14,15 +14,19 @@ use crate::login::app::Logins;
#[derive(Clone)]
pub struct App {
db: SqlitePool,
- events: EventBroadcaster,
- tokens: TokenBroadcaster,
+ events: event::Broadcaster,
+ token_events: token::Broadcaster,
}
impl App {
pub fn from(db: SqlitePool) -> Self {
- let events = EventBroadcaster::default();
- let tokens = TokenBroadcaster::default();
- Self { db, events, tokens }
+ let events = event::Broadcaster::default();
+ let token_events = token::Broadcaster::default();
+ Self {
+ db,
+ events,
+ token_events,
+ }
}
}
@@ -41,7 +45,7 @@ impl App {
#[cfg(test)]
pub const fn logins(&self) -> Logins {
- Logins::new(&self.db)
+ Logins::new(&self.db, &self.events)
}
pub const fn messages(&self) -> Messages {
@@ -49,6 +53,6 @@ impl App {
}
pub const fn tokens(&self) -> Tokens {
- Tokens::new(&self.db, &self.tokens)
+ Tokens::new(&self.db, &self.events, &self.token_events)
}
}
diff --git a/src/boot/app.rs b/src/boot/app.rs
index fc84b3a..ef48b2f 100644
--- a/src/boot/app.rs
+++ b/src/boot/app.rs
@@ -1,8 +1,9 @@
use sqlx::sqlite::SqlitePool;
-use super::{Channel, Snapshot};
+use super::Snapshot;
use crate::{
- channel::repo::Provider as _, event::repo::Provider as _, message::repo::Provider as _,
+ channel::repo::Provider as _, event::repo::Provider as _, login::repo::Provider as _,
+ message::repo::Provider as _,
};
pub struct Boot<'a> {
@@ -17,38 +18,33 @@ impl<'a> Boot<'a> {
pub async fn snapshot(&self) -> Result<Snapshot, sqlx::Error> {
let mut tx = self.db.begin().await?;
let resume_point = tx.sequence().current().await?;
- let channels = tx.channels().all(resume_point.into()).await?;
-
- let channels = {
- let mut snapshots = Vec::with_capacity(channels.len());
-
- let channels = channels.into_iter().filter_map(|channel| {
- channel
- .as_of(resume_point)
- .map(|snapshot| (channel, snapshot))
- });
- for (channel, snapshot) in channels {
- let messages = tx
- .messages()
- .in_channel(&channel, resume_point.into())
- .await?;
+ let logins = tx.logins().all(resume_point.into()).await?;
+ let channels = tx.channels().all(resume_point.into()).await?;
+ let messages = tx.messages().all(resume_point.into()).await?;
- let messages = messages
- .into_iter()
- .filter_map(|message| message.as_of(resume_point));
+ tx.commit().await?;
- snapshots.push(Channel::new(snapshot, messages));
- }
+ let logins = logins
+ .into_iter()
+ .filter_map(|login| login.as_of(resume_point))
+ .collect();
- snapshots
- };
+ let channels = channels
+ .into_iter()
+ .filter_map(|channel| channel.as_of(resume_point))
+ .collect();
- tx.commit().await?;
+ let messages = messages
+ .into_iter()
+ .filter_map(|message| message.as_of(resume_point))
+ .collect();
Ok(Snapshot {
resume_point,
+ logins,
channels,
+ messages,
})
}
}
diff --git a/src/boot/mod.rs b/src/boot/mod.rs
index bd0da0a..ed4764a 100644
--- a/src/boot/mod.rs
+++ b/src/boot/mod.rs
@@ -1,74 +1,14 @@
pub mod app;
mod routes;
-use crate::{
- channel,
- event::{Instant, Sequence},
- login::Login,
- message,
-};
+use crate::{channel::Channel, event::Sequence, login::Login, message::Message};
pub use self::routes::router;
#[derive(serde::Serialize)]
pub struct Snapshot {
pub resume_point: Sequence,
+ pub logins: Vec<Login>,
pub channels: Vec<Channel>,
-}
-
-#[derive(serde::Serialize)]
-pub struct Channel {
- pub id: channel::Id,
- pub name: String,
pub messages: Vec<Message>,
}
-
-impl Channel {
- fn new(
- channel: channel::Channel,
- messages: impl IntoIterator<Item = message::Message>,
- ) -> Self {
- // The declarations are like this to guarantee that we aren't omitting any important fields from the corresponding types.
- let channel::Channel { id, name } = channel;
-
- Self {
- id,
- name,
- messages: messages.into_iter().map(Message::from).collect(),
- }
- }
-}
-
-#[derive(serde::Serialize)]
-pub struct Message {
- #[serde(flatten)]
- pub sent: Instant,
- pub sender: Login,
- // Named this way for serialization reasons
- #[allow(clippy::struct_field_names)]
- pub message: Body,
-}
-
-impl From<message::Message> for Message {
- fn from(message: message::Message) -> Self {
- let message::Message {
- sent,
- channel: _,
- sender,
- id,
- body,
- } = message;
-
- Self {
- sent,
- sender,
- message: Body { id, body },
- }
- }
-}
-
-#[derive(serde::Serialize)]
-pub struct Body {
- id: message::Id,
- body: String,
-}
diff --git a/src/channel/app.rs b/src/channel/app.rs
index a9a9e84..cb7ad32 100644
--- a/src/channel/app.rs
+++ b/src/channel/app.rs
@@ -6,7 +6,7 @@ use super::{repo::Provider as _, Channel, Id};
use crate::{
clock::DateTime,
db::NotFound,
- event::{broadcaster::Broadcaster, repo::Provider as _, Event, Sequence},
+ event::{repo::Provider as _, Broadcaster, Event, Sequence},
message::repo::Provider as _,
};
diff --git a/src/channel/event.rs b/src/channel/event.rs
index 9c54174..f3dca3e 100644
--- a/src/channel/event.rs
+++ b/src/channel/event.rs
@@ -5,32 +5,30 @@ use crate::{
};
#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
-pub struct Event {
- #[serde(flatten)]
- pub instant: Instant,
- #[serde(flatten)]
- pub kind: Kind,
+#[serde(tag = "event", rename_all = "snake_case")]
+pub enum Event {
+ Created(Created),
+ Deleted(Deleted),
}
impl Sequenced for Event {
fn instant(&self) -> Instant {
- self.instant
+ match self {
+ Self::Created(event) => event.instant,
+ Self::Deleted(event) => event.instant,
+ }
}
}
#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
-#[serde(tag = "type", rename_all = "snake_case")]
-pub enum Kind {
- Created(Created),
- Deleted(Deleted),
-}
-
-#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
pub struct Created {
+ #[serde(flatten)]
+ pub instant: Instant,
+ #[serde(flatten)]
pub channel: Channel,
}
-impl From<Created> for Kind {
+impl From<Created> for Event {
fn from(event: Created) -> Self {
Self::Created(event)
}
@@ -38,10 +36,12 @@ impl From<Created> for Kind {
#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
pub struct Deleted {
- pub channel: channel::Id,
+ #[serde(flatten)]
+ pub instant: Instant,
+ pub id: channel::Id,
}
-impl From<Deleted> for Kind {
+impl From<Deleted> for Event {
fn from(event: Deleted) -> Self {
Self::Deleted(event)
}
diff --git a/src/channel/history.rs b/src/channel/history.rs
index 383fb7b..78b3437 100644
--- a/src/channel/history.rs
+++ b/src/channel/history.rs
@@ -40,22 +40,20 @@ impl History {
}
fn created(&self) -> Event {
- Event {
+ Created {
instant: self.created,
- kind: Created {
- channel: self.channel.clone(),
- }
- .into(),
+ channel: self.channel.clone(),
}
+ .into()
}
fn deleted(&self) -> Option<Event> {
- self.deleted.map(|instant| Event {
- instant,
- kind: Deleted {
- channel: self.channel.id.clone(),
+ self.deleted.map(|instant| {
+ Deleted {
+ instant,
+ id: self.channel.id.clone(),
}
- .into(),
+ .into()
})
}
}
diff --git a/src/channel/routes/test/on_create.rs b/src/channel/routes/test/on_create.rs
index ed49017..eeecc7f 100644
--- a/src/channel/routes/test/on_create.rs
+++ b/src/channel/routes/test/on_create.rs
@@ -2,7 +2,7 @@ use axum::extract::{Json, State};
use futures::stream::StreamExt as _;
use crate::{
- channel::{app, routes},
+ channel::{self, app, routes},
event,
test::fixtures::{self, future::Immediately as _},
};
@@ -12,7 +12,7 @@ async fn new_channel() {
// Set up the environment
let app = fixtures::scratch_app().await;
- let creator = fixtures::login::create(&app).await;
+ let creator = fixtures::login::create(&app, &fixtures::now()).await;
// Call the endpoint
@@ -53,8 +53,8 @@ async fn new_channel() {
.expect("creation event published");
assert!(matches!(
- event.kind,
- event::Kind::ChannelCreated(event)
+ event,
+ event::Event::Channel(channel::Event::Created(event))
if event.channel == response_channel
));
}
@@ -64,7 +64,7 @@ async fn duplicate_name() {
// Set up the environment
let app = fixtures::scratch_app().await;
- let creator = fixtures::login::create(&app).await;
+ let creator = fixtures::login::create(&app, &fixtures::now()).await;
let channel = fixtures::channel::create(&app, &fixtures::now()).await;
// Call the endpoint
diff --git a/src/channel/routes/test/on_send.rs b/src/channel/routes/test/on_send.rs
index 3297093..3fe3d1e 100644
--- a/src/channel/routes/test/on_send.rs
+++ b/src/channel/routes/test/on_send.rs
@@ -4,8 +4,8 @@ use futures::stream::StreamExt;
use crate::{
channel,
channel::routes,
- event,
- message::app::SendError,
+ event::{self, Sequenced},
+ message::{self, app::SendError},
test::fixtures::{self, future::Immediately as _},
};
@@ -14,7 +14,7 @@ async fn messages_in_order() {
// Set up the environment
let app = fixtures::scratch_app().await;
- let sender = fixtures::login::create(&app).await;
+ let sender = fixtures::login::create(&app, &fixtures::now()).await;
let channel = fixtures::channel::create(&app, &fixtures::now()).await;
// Call the endpoint (twice)
@@ -53,11 +53,11 @@ async fn messages_in_order() {
let events = events.collect::<Vec<_>>().immediately().await;
for ((sent_at, message), event) in requests.into_iter().zip(events) {
- assert_eq!(*sent_at, event.instant.at);
+ assert_eq!(*sent_at, event.at());
assert!(matches!(
- event.kind,
- event::Kind::MessageSent(event)
- if event.message.sender == sender
+ event,
+ event::Event::Message(message::Event::Sent(event))
+ if event.message.sender == sender.id
&& event.message.body == message
));
}
@@ -68,7 +68,7 @@ async fn nonexistent_channel() {
// Set up the environment
let app = fixtures::scratch_app().await;
- let login = fixtures::login::create(&app).await;
+ let login = fixtures::login::create(&app, &fixtures::now()).await;
// Call the endpoint
diff --git a/src/channel/snapshot.rs b/src/channel/snapshot.rs
index 6462f25..d4d1d27 100644
--- a/src/channel/snapshot.rs
+++ b/src/channel/snapshot.rs
@@ -1,5 +1,5 @@
use super::{
- event::{Created, Event, Kind},
+ event::{Created, Event},
Id,
};
@@ -11,9 +11,9 @@ pub struct Channel {
impl Channel {
fn apply(state: Option<Self>, event: Event) -> Option<Self> {
- match (state, event.kind) {
- (None, Kind::Created(event)) => Some(event.into()),
- (Some(channel), Kind::Deleted(event)) if channel.id == event.channel => None,
+ match (state, event) {
+ (None, Event::Created(event)) => Some(event.into()),
+ (Some(channel), Event::Deleted(event)) if channel.id == event.id => None,
(state, event) => panic!("invalid channel event {event:#?} for state {state:#?}"),
}
}
diff --git a/src/event/app.rs b/src/event/app.rs
index 141037d..951ce25 100644
--- a/src/event/app.rs
+++ b/src/event/app.rs
@@ -9,6 +9,7 @@ use sqlx::sqlite::SqlitePool;
use super::{broadcaster::Broadcaster, Event, ResumePoint, Sequence, Sequenced};
use crate::{
channel::{self, repo::Provider as _},
+ login::{self, repo::Provider as _},
message::{self, repo::Provider as _},
};
@@ -33,6 +34,14 @@ impl<'a> Events<'a> {
let mut tx = self.db.begin().await?;
+ let logins = tx.logins().replay(resume_at).await?;
+ let login_events = logins
+ .iter()
+ .map(login::History::events)
+ .kmerge_by(Sequence::merge)
+ .filter(Sequence::after(resume_at))
+ .map(Event::from);
+
let channels = tx.channels().replay(resume_at).await?;
let channel_events = channels
.iter()
@@ -49,7 +58,8 @@ impl<'a> Events<'a> {
.filter(Sequence::after(resume_at))
.map(Event::from);
- let replay_events = channel_events
+ let replay_events = login_events
+ .merge_by(channel_events, Sequence::merge)
.merge_by(message_events, Sequence::merge)
.collect::<Vec<_>>();
let resume_live_at = replay_events.last().map(Sequenced::sequence);
diff --git a/src/event/mod.rs b/src/event/mod.rs
index e748d66..69c7a10 100644
--- a/src/event/mod.rs
+++ b/src/event/mod.rs
@@ -1,13 +1,14 @@
-use crate::{channel, message};
+use crate::{channel, login, message};
pub mod app;
-pub mod broadcaster;
+mod broadcaster;
mod extract;
pub mod repo;
mod routes;
mod sequence;
pub use self::{
+ broadcaster::Broadcaster,
routes::router,
sequence::{Instant, Sequence, Sequenced},
};
@@ -15,63 +16,37 @@ pub use self::{
pub type ResumePoint = Option<Sequence>;
#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
-pub struct Event {
- #[serde(flatten)]
- pub instant: Instant,
- #[serde(flatten)]
- pub kind: Kind,
+#[serde(tag = "type", rename_all = "snake_case")]
+pub enum Event {
+ Login(login::Event),
+ Channel(channel::Event),
+ Message(message::Event),
}
impl Sequenced for Event {
fn instant(&self) -> Instant {
- self.instant
- }
-}
-
-impl From<channel::Event> for Event {
- fn from(event: channel::Event) -> Self {
- Self {
- instant: event.instant,
- kind: event.kind.into(),
+ match self {
+ Self::Login(event) => event.instant(),
+ Self::Channel(event) => event.instant(),
+ Self::Message(event) => event.instant(),
}
}
}
-impl From<message::Event> for Event {
- fn from(event: message::Event) -> Self {
- Self {
- instant: event.instant(),
- kind: event.kind.into(),
- }
+impl From<login::Event> for Event {
+ fn from(event: login::Event) -> Self {
+ Self::Login(event)
}
}
-#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
-#[serde(tag = "type", rename_all = "snake_case")]
-pub enum Kind {
- #[serde(rename = "created")]
- ChannelCreated(channel::event::Created),
- #[serde(rename = "message")]
- MessageSent(message::event::Sent),
- MessageDeleted(message::event::Deleted),
- #[serde(rename = "deleted")]
- ChannelDeleted(channel::event::Deleted),
-}
-
-impl From<channel::event::Kind> for Kind {
- fn from(kind: channel::event::Kind) -> Self {
- match kind {
- channel::event::Kind::Created(created) => Self::ChannelCreated(created),
- channel::event::Kind::Deleted(deleted) => Self::ChannelDeleted(deleted),
- }
+impl From<channel::Event> for Event {
+ fn from(event: channel::Event) -> Self {
+ Self::Channel(event)
}
}
-impl From<message::event::Kind> for Kind {
- fn from(kind: message::event::Kind) -> Self {
- match kind {
- message::event::Kind::Sent(created) => Self::MessageSent(created),
- message::event::Kind::Deleted(deleted) => Self::MessageDeleted(deleted),
- }
+impl From<message::Event> for Event {
+ fn from(event: message::Event) -> Self {
+ Self::Message(event)
}
}
diff --git a/src/event/routes/test.rs b/src/event/routes/test.rs
index ba9953e..209a016 100644
--- a/src/event/routes/test.rs
+++ b/src/event/routes/test.rs
@@ -15,13 +15,13 @@ async fn includes_historical_message() {
// Set up the environment
let app = fixtures::scratch_app().await;
- let sender = fixtures::login::create(&app).await;
+ let sender = fixtures::login::create(&app, &fixtures::now()).await;
let channel = fixtures::channel::create(&app, &fixtures::now()).await;
let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await;
// Call the endpoint
- let subscriber_creds = fixtures::login::create_with_password(&app).await;
+ let subscriber_creds = fixtures::login::create_with_password(&app, &fixtures::now()).await;
let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await;
let routes::Events(events) = routes::events(State(app), subscriber, None, Query::default())
.await
@@ -48,7 +48,7 @@ async fn includes_live_message() {
// Call the endpoint
- let subscriber_creds = fixtures::login::create_with_password(&app).await;
+ let subscriber_creds = fixtures::login::create_with_password(&app, &fixtures::now()).await;
let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await;
let routes::Events(events) =
routes::events(State(app.clone()), subscriber, None, Query::default())
@@ -57,7 +57,7 @@ async fn includes_live_message() {
// Verify the semantics
- let sender = fixtures::login::create(&app).await;
+ let sender = fixtures::login::create(&app, &fixtures::now()).await;
let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await;
let event = events
@@ -75,7 +75,7 @@ async fn includes_multiple_channels() {
// Set up the environment
let app = fixtures::scratch_app().await;
- let sender = fixtures::login::create(&app).await;
+ let sender = fixtures::login::create(&app, &fixtures::now()).await;
let channels = [
fixtures::channel::create(&app, &fixtures::now()).await,
@@ -94,7 +94,7 @@ async fn includes_multiple_channels() {
// Call the endpoint
- let subscriber_creds = fixtures::login::create_with_password(&app).await;
+ let subscriber_creds = fixtures::login::create_with_password(&app, &fixtures::now()).await;
let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await;
let routes::Events(events) = routes::events(State(app), subscriber, None, Query::default())
.await
@@ -122,7 +122,7 @@ async fn sequential_messages() {
let app = fixtures::scratch_app().await;
let channel = fixtures::channel::create(&app, &fixtures::now()).await;
- let sender = fixtures::login::create(&app).await;
+ let sender = fixtures::login::create(&app, &fixtures::now()).await;
let messages = vec![
fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
@@ -132,7 +132,7 @@ async fn sequential_messages() {
// Call the endpoint
- let subscriber_creds = fixtures::login::create_with_password(&app).await;
+ let subscriber_creds = fixtures::login::create_with_password(&app, &fixtures::now()).await;
let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await;
let routes::Events(events) = routes::events(State(app), subscriber, None, Query::default())
.await
@@ -166,7 +166,7 @@ async fn resumes_from() {
let app = fixtures::scratch_app().await;
let channel = fixtures::channel::create(&app, &fixtures::now()).await;
- let sender = fixtures::login::create(&app).await;
+ let sender = fixtures::login::create(&app, &fixtures::now()).await;
let initial_message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await;
@@ -177,7 +177,7 @@ async fn resumes_from() {
// Call the endpoint
- let subscriber_creds = fixtures::login::create_with_password(&app).await;
+ let subscriber_creds = fixtures::login::create_with_password(&app, &fixtures::now()).await;
let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await;
let resume_at = {
@@ -248,13 +248,13 @@ async fn serial_resume() {
// Set up the environment
let app = fixtures::scratch_app().await;
- let sender = fixtures::login::create(&app).await;
+ let sender = fixtures::login::create(&app, &fixtures::now()).await;
let channel_a = fixtures::channel::create(&app, &fixtures::now()).await;
let channel_b = fixtures::channel::create(&app, &fixtures::now()).await;
// Call the endpoint
- let subscriber_creds = fixtures::login::create_with_password(&app).await;
+ let subscriber_creds = fixtures::login::create_with_password(&app, &fixtures::now()).await;
let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await;
let resume_at = {
@@ -372,11 +372,11 @@ async fn terminates_on_token_expiry() {
let app = fixtures::scratch_app().await;
let channel = fixtures::channel::create(&app, &fixtures::now()).await;
- let sender = fixtures::login::create(&app).await;
+ let sender = fixtures::login::create(&app, &fixtures::now()).await;
// Subscribe via the endpoint
- let subscriber_creds = fixtures::login::create_with_password(&app).await;
+ let subscriber_creds = fixtures::login::create_with_password(&app, &fixtures::now()).await;
let subscriber =
fixtures::identity::identity(&app, &subscriber_creds, &fixtures::ancient()).await;
@@ -417,11 +417,11 @@ async fn terminates_on_logout() {
let app = fixtures::scratch_app().await;
let channel = fixtures::channel::create(&app, &fixtures::now()).await;
- let sender = fixtures::login::create(&app).await;
+ let sender = fixtures::login::create(&app, &fixtures::now()).await;
// Subscribe via the endpoint
- let subscriber_creds = fixtures::login::create_with_password(&app).await;
+ let subscriber_creds = fixtures::login::create_with_password(&app, &fixtures::now()).await;
let subscriber_token =
fixtures::identity::logged_in(&app, &subscriber_creds, &fixtures::now()).await;
let subscriber =
diff --git a/src/event/sequence.rs b/src/event/sequence.rs
index ceb5bcb..bf6d5b8 100644
--- a/src/event/sequence.rs
+++ b/src/event/sequence.rs
@@ -72,6 +72,10 @@ impl Sequence {
pub trait Sequenced {
fn instant(&self) -> Instant;
+ fn at(&self) -> DateTime {
+ self.instant().at
+ }
+
fn sequence(&self) -> Sequence {
self.instant().into()
}
diff --git a/src/login/app.rs b/src/login/app.rs
index 4f60b89..bb1419b 100644
--- a/src/login/app.rs
+++ b/src/login/app.rs
@@ -1,24 +1,38 @@
use sqlx::sqlite::SqlitePool;
use super::{repo::Provider as _, Login, Password};
+use crate::{
+ clock::DateTime,
+ event::{repo::Provider as _, Broadcaster, Event},
+};
pub struct Logins<'a> {
db: &'a SqlitePool,
+ events: &'a Broadcaster,
}
impl<'a> Logins<'a> {
- pub const fn new(db: &'a SqlitePool) -> Self {
- Self { db }
+ pub const fn new(db: &'a SqlitePool, events: &'a Broadcaster) -> Self {
+ Self { db, events }
}
- pub async fn create(&self, name: &str, password: &Password) -> Result<Login, CreateError> {
+ pub async fn create(
+ &self,
+ name: &str,
+ password: &Password,
+ created_at: &DateTime,
+ ) -> Result<Login, CreateError> {
let password_hash = password.hash()?;
let mut tx = self.db.begin().await?;
- let login = tx.logins().create(name, &password_hash).await?;
+ let created = tx.sequence().next(created_at).await?;
+ let login = tx.logins().create(name, &password_hash, &created).await?;
tx.commit().await?;
- Ok(login)
+ self.events
+ .broadcast(login.events().map(Event::from).collect::<Vec<_>>());
+
+ Ok(login.as_created())
}
}
diff --git a/src/login/event.rs b/src/login/event.rs
new file mode 100644
index 0000000..b03451a
--- /dev/null
+++ b/src/login/event.rs
@@ -0,0 +1,36 @@
+use super::snapshot::Login;
+use crate::event::{Instant, Sequenced};
+
+#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
+#[serde(tag = "event", rename_all = "snake_case")]
+pub enum Event {
+ Created(Created),
+}
+
+impl Sequenced for Event {
+ fn instant(&self) -> Instant {
+ match self {
+ Self::Created(created) => created.instant(),
+ }
+ }
+}
+
+#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
+pub struct Created {
+ #[serde(flatten)]
+ pub instant: Instant,
+ #[serde(flatten)]
+ pub login: Login,
+}
+
+impl Sequenced for Created {
+ fn instant(&self) -> Instant {
+ self.instant
+ }
+}
+
+impl From<Created> for Event {
+ fn from(event: Created) -> Self {
+ Self::Created(event)
+ }
+}
diff --git a/src/login/history.rs b/src/login/history.rs
new file mode 100644
index 0000000..add7d1e
--- /dev/null
+++ b/src/login/history.rs
@@ -0,0 +1,47 @@
+use super::{
+ event::{Created, Event},
+ Id, Login,
+};
+use crate::event::{Instant, ResumePoint, Sequence};
+
+#[derive(Clone, Debug, Eq, PartialEq)]
+pub struct History {
+ pub login: Login,
+ pub created: Instant,
+}
+
+// State interface
+impl History {
+ pub fn id(&self) -> &Id {
+ &self.login.id
+ }
+
+ // Snapshot of this login as it was when created. (Note to the future: it's okay
+ // if this returns a redacted or modified version of the login. If we implement
+ // renames by redacting the original name, then this should return the edited login, not the original, even if that's not how it was "as created.")
+ #[cfg(test)]
+ pub fn as_created(&self) -> Login {
+ self.login.clone()
+ }
+
+ pub fn as_of(&self, resume_point: impl Into<ResumePoint>) -> Option<Login> {
+ self.events()
+ .filter(Sequence::up_to(resume_point.into()))
+ .collect()
+ }
+}
+
+// Events interface
+impl History {
+ fn created(&self) -> Event {
+ Created {
+ instant: self.created,
+ login: self.login.clone(),
+ }
+ .into()
+ }
+
+ pub fn events(&self) -> impl Iterator<Item = Event> {
+ [self.created()].into_iter()
+ }
+}
diff --git a/src/login/mod.rs b/src/login/mod.rs
index f272f80..98cc3d7 100644
--- a/src/login/mod.rs
+++ b/src/login/mod.rs
@@ -1,22 +1,14 @@
#[cfg(test)]
pub mod app;
+pub mod event;
pub mod extract;
+mod history;
mod id;
pub mod password;
pub mod repo;
mod routes;
+mod snapshot;
-pub use self::{id::Id, password::Password, routes::router};
-
-// This also implements FromRequestParts (see `./extract.rs`). As a result, it
-// can be used as an extractor for endpoints that want to require login, or for
-// endpoints that need to behave differently depending on whether the client is
-// or is not logged in.
-#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
-pub struct Login {
- pub id: Id,
- pub name: String,
- // The omission of the hashed password is deliberate, to minimize the
- // chance that it ends up tangled up in debug output or in some other chunk
- // of logic elsewhere.
-}
+pub use self::{
+ event::Event, history::History, id::Id, password::Password, routes::router, snapshot::Login,
+};
diff --git a/src/login/repo.rs b/src/login/repo.rs
index d1a02c4..6d6510c 100644
--- a/src/login/repo.rs
+++ b/src/login/repo.rs
@@ -1,6 +1,10 @@
use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction};
-use crate::login::{password::StoredHash, Id, Login};
+use crate::{
+ clock::DateTime,
+ event::{Instant, ResumePoint, Sequence},
+ login::{password::StoredHash, History, Id, Login},
+};
pub trait Provider {
fn logins(&mut self) -> Logins;
@@ -19,28 +23,100 @@ impl<'c> Logins<'c> {
&mut self,
name: &str,
password_hash: &StoredHash,
- ) -> Result<Login, sqlx::Error> {
+ created: &Instant,
+ ) -> Result<History, sqlx::Error> {
let id = Id::generate();
- let login = sqlx::query_as!(
- Login,
+ let login = sqlx::query!(
r#"
- insert or fail
- into login (id, name, password_hash)
- values ($1, $2, $3)
+ insert
+ into login (id, name, password_hash, created_sequence, created_at)
+ values ($1, $2, $3, $4, $5)
returning
id as "id: Id",
- name
+ name,
+ created_sequence as "created_sequence: Sequence",
+ created_at as "created_at: DateTime"
"#,
id,
name,
password_hash,
+ created.sequence,
+ created.at,
)
+ .map(|row| History {
+ login: Login {
+ id: row.id,
+ name: row.name,
+ },
+ created: Instant {
+ at: row.created_at,
+ sequence: row.created_sequence,
+ },
+ })
.fetch_one(&mut *self.0)
.await?;
Ok(login)
}
+
+ pub async fn all(&mut self, resume_at: ResumePoint) -> Result<Vec<History>, sqlx::Error> {
+ let channels = sqlx::query!(
+ r#"
+ select
+ id as "id: Id",
+ name,
+ created_sequence as "created_sequence: Sequence",
+ created_at as "created_at: DateTime"
+ from login
+ where coalesce(created_sequence <= $1, true)
+ order by created_sequence
+ "#,
+ resume_at,
+ )
+ .map(|row| History {
+ login: Login {
+ id: row.id,
+ name: row.name,
+ },
+ created: Instant {
+ at: row.created_at,
+ sequence: row.created_sequence,
+ },
+ })
+ .fetch_all(&mut *self.0)
+ .await?;
+
+ Ok(channels)
+ }
+ pub async fn replay(&mut self, resume_at: ResumePoint) -> Result<Vec<History>, sqlx::Error> {
+ let messages = sqlx::query!(
+ r#"
+ select
+ id as "id: Id",
+ name,
+ created_sequence as "created_sequence: Sequence",
+ created_at as "created_at: DateTime"
+ from login
+ where coalesce(login.created_sequence > $1, true)
+ "#,
+ resume_at,
+ )
+ .map(|row| History {
+ login: Login {
+ id: row.id,
+ name: row.name,
+ },
+ created: Instant {
+ at: row.created_at,
+ sequence: row.created_sequence,
+ },
+ })
+ .fetch_all(&mut *self.0)
+ .await?;
+
+ Ok(messages)
+ }
}
impl<'t> From<&'t mut SqliteConnection> for Logins<'t> {
diff --git a/src/login/routes/test/login.rs b/src/login/routes/test/login.rs
index 3c82738..6a3b79c 100644
--- a/src/login/routes/test/login.rs
+++ b/src/login/routes/test/login.rs
@@ -47,7 +47,7 @@ async fn existing_identity() {
// Set up the environment
let app = fixtures::scratch_app().await;
- let (name, password) = fixtures::login::create_with_password(&app).await;
+ let (name, password) = fixtures::login::create_with_password(&app, &fixtures::now()).await;
// Call the endpoint
@@ -84,7 +84,7 @@ async fn authentication_failed() {
// Set up the environment
let app = fixtures::scratch_app().await;
- let login = fixtures::login::create(&app).await;
+ let login = fixtures::login::create(&app, &fixtures::now()).await;
// Call the endpoint
@@ -109,7 +109,7 @@ async fn token_expires() {
// Set up the environment
let app = fixtures::scratch_app().await;
- let (name, password) = fixtures::login::create_with_password(&app).await;
+ let (name, password) = fixtures::login::create_with_password(&app, &fixtures::now()).await;
// Call the endpoint
diff --git a/src/login/routes/test/logout.rs b/src/login/routes/test/logout.rs
index 42b2534..611829e 100644
--- a/src/login/routes/test/logout.rs
+++ b/src/login/routes/test/logout.rs
@@ -11,7 +11,7 @@ async fn successful() {
let app = fixtures::scratch_app().await;
let now = fixtures::now();
- let login = fixtures::login::create_with_password(&app).await;
+ let login = fixtures::login::create_with_password(&app, &fixtures::now()).await;
let identity = fixtures::identity::logged_in(&app, &login, &now).await;
let secret = fixtures::identity::secret(&identity);
diff --git a/src/login/snapshot.rs b/src/login/snapshot.rs
new file mode 100644
index 0000000..1379005
--- /dev/null
+++ b/src/login/snapshot.rs
@@ -0,0 +1,49 @@
+use super::{
+ event::{Created, Event},
+ Id,
+};
+
+// This also implements FromRequestParts (see `./extract.rs`). As a result, it
+// can be used as an extractor for endpoints that want to require login, or for
+// endpoints that need to behave differently depending on whether the client is
+// or is not logged in.
+#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
+pub struct Login {
+ pub id: Id,
+ pub name: String,
+ // The omission of the hashed password is deliberate, to minimize the
+ // chance that it ends up tangled up in debug output or in some other chunk
+ // of logic elsewhere.
+}
+
+impl Login {
+ // Two reasons for this allow:
+ //
+ // 1. This is used to collect streams using a fold, below, which requires a type consistent with the fold, and
+ // 2. It's also consistent with the other history state machine types.
+ #[allow(clippy::unnecessary_wraps)]
+ fn apply(state: Option<Self>, event: Event) -> Option<Self> {
+ match (state, event) {
+ (None, Event::Created(event)) => Some(event.into()),
+ (state, event) => panic!("invalid message event {event:#?} for state {state:#?}"),
+ }
+ }
+}
+
+impl FromIterator<Event> for Option<Login> {
+ fn from_iter<I: IntoIterator<Item = Event>>(events: I) -> Self {
+ events.into_iter().fold(None, Login::apply)
+ }
+}
+
+impl From<&Created> for Login {
+ fn from(event: &Created) -> Self {
+ event.login.clone()
+ }
+}
+
+impl From<Created> for Login {
+ fn from(event: Created) -> Self {
+ event.login
+ }
+}
diff --git a/src/message/app.rs b/src/message/app.rs
index 385c92e..3385af2 100644
--- a/src/message/app.rs
+++ b/src/message/app.rs
@@ -7,7 +7,7 @@ use crate::{
channel::{self, repo::Provider as _},
clock::DateTime,
db::NotFound as _,
- event::{broadcaster::Broadcaster, repo::Provider as _, Event, Sequence},
+ event::{repo::Provider as _, Broadcaster, Event, Sequence},
login::Login,
};
diff --git a/src/message/event.rs b/src/message/event.rs
index 66db9b0..1cd5847 100644
--- a/src/message/event.rs
+++ b/src/message/event.rs
@@ -1,29 +1,14 @@
use super::{snapshot::Message, Id};
-use crate::{
- channel::Channel,
- event::{Instant, Sequenced},
-};
+use crate::event::{Instant, Sequenced};
#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
-pub struct Event {
- #[serde(flatten)]
- pub kind: Kind,
-}
-
-impl Sequenced for Event {
- fn instant(&self) -> Instant {
- self.kind.instant()
- }
-}
-
-#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
-#[serde(tag = "type", rename_all = "snake_case")]
-pub enum Kind {
+#[serde(tag = "event", rename_all = "snake_case")]
+pub enum Event {
Sent(Sent),
Deleted(Deleted),
}
-impl Sequenced for Kind {
+impl Sequenced for Event {
fn instant(&self) -> Instant {
match self {
Self::Sent(sent) => sent.instant(),
@@ -44,7 +29,7 @@ impl Sequenced for Sent {
}
}
-impl From<Sent> for Kind {
+impl From<Sent> for Event {
fn from(event: Sent) -> Self {
Self::Sent(event)
}
@@ -54,8 +39,7 @@ impl From<Sent> for Kind {
pub struct Deleted {
#[serde(flatten)]
pub instant: Instant,
- pub channel: Channel,
- pub message: Id,
+ pub id: Id,
}
impl Sequenced for Deleted {
@@ -64,7 +48,7 @@ impl Sequenced for Deleted {
}
}
-impl From<Deleted> for Kind {
+impl From<Deleted> for Event {
fn from(event: Deleted) -> Self {
Self::Deleted(event)
}
diff --git a/src/message/history.rs b/src/message/history.rs
index f267f4c..09e69b7 100644
--- a/src/message/history.rs
+++ b/src/message/history.rs
@@ -35,22 +35,19 @@ impl History {
// Events interface
impl History {
fn sent(&self) -> Event {
- Event {
- kind: Sent {
- message: self.message.clone(),
- }
- .into(),
+ Sent {
+ message: self.message.clone(),
}
+ .into()
}
fn deleted(&self) -> Option<Event> {
- self.deleted.map(|instant| Event {
- kind: Deleted {
+ self.deleted.map(|instant| {
+ Deleted {
instant,
- channel: self.message.channel.clone(),
- message: self.message.id.clone(),
+ id: self.message.id.clone(),
}
- .into(),
+ .into()
})
}
diff --git a/src/message/repo.rs b/src/message/repo.rs
index 5b199a7..71c6d10 100644
--- a/src/message/repo.rs
+++ b/src/message/repo.rs
@@ -2,7 +2,7 @@ use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction};
use super::{snapshot::Message, History, Id};
use crate::{
- channel::{self, Channel},
+ channel,
clock::DateTime,
event::{Instant, ResumePoint, Sequence},
login::{self, Login},
@@ -38,6 +38,10 @@ impl<'c> Messages<'c> {
values ($1, $2, $3, $4, $5, $6)
returning
id as "id: Id",
+ channel as "channel: channel::Id",
+ sender as "sender: login::Id",
+ sent_at as "sent_at: DateTime",
+ sent_sequence as "sent_sequence: Sequence",
body
"#,
id,
@@ -49,12 +53,12 @@ impl<'c> Messages<'c> {
)
.map(|row| History {
message: Message {
- sent: *sent,
- // Use "as created" here as we don't care about providing a perfectly up-to-date
- // representation of the channel. The `name` is informational (and the ID, which is
- // normative, is fixed over time).
- channel: channel.as_created(),
- sender: sender.clone(),
+ sent: Instant {
+ at: row.sent_at,
+ sequence: row.sent_sequence,
+ },
+ channel: row.channel,
+ sender: row.sender,
id: row.id,
body: row.body,
},
@@ -75,20 +79,16 @@ impl<'c> Messages<'c> {
let messages = sqlx::query!(
r#"
select
- channel.id as "channel_id: channel::Id",
- channel.name as "channel_name",
- sender.id as "sender_id: login::Id",
- sender.name as "sender_name",
- message.id as "id: Id",
- message.body,
+ channel as "channel: channel::Id",
+ sender as "sender: login::Id",
+ id as "id: Id",
+ body,
sent_at as "sent_at: DateTime",
sent_sequence as "sent_sequence: Sequence"
from message
- join channel on message.channel = channel.id
- join login as sender on message.sender = sender.id
- where channel.id = $1
- and coalesce(message.sent_sequence <= $2, true)
- order by message.sent_sequence
+ where channel = $1
+ and coalesce(sent_sequence <= $2, true)
+ order by sent_sequence
"#,
channel_id,
resume_at,
@@ -99,14 +99,43 @@ impl<'c> Messages<'c> {
at: row.sent_at,
sequence: row.sent_sequence,
},
- channel: Channel {
- id: row.channel_id,
- name: row.channel_name,
- },
- sender: Login {
- id: row.sender_id,
- name: row.sender_name,
+ channel: row.channel,
+ sender: row.sender,
+ id: row.id,
+ body: row.body,
+ },
+ deleted: None,
+ })
+ .fetch_all(&mut *self.0)
+ .await?;
+
+ Ok(messages)
+ }
+
+ pub async fn all(&mut self, resume_at: ResumePoint) -> Result<Vec<History>, sqlx::Error> {
+ let messages = sqlx::query!(
+ r#"
+ select
+ channel as "channel: channel::Id",
+ sender as "sender: login::Id",
+ id as "id: Id",
+ body,
+ sent_at as "sent_at: DateTime",
+ sent_sequence as "sent_sequence: Sequence"
+ from message
+ where coalesce(sent_sequence <= $2, true)
+ order by sent_sequence
+ "#,
+ resume_at,
+ )
+ .map(|row| History {
+ message: Message {
+ sent: Instant {
+ at: row.sent_at,
+ sequence: row.sent_sequence,
},
+ channel: row.channel,
+ sender: row.sender,
id: row.id,
body: row.body,
},
@@ -122,18 +151,14 @@ impl<'c> Messages<'c> {
let message = sqlx::query!(
r#"
select
- channel.id as "channel_id: channel::Id",
- channel.name as "channel_name",
- sender.id as "sender_id: login::Id",
- sender.name as "sender_name",
- message.id as "id: Id",
- message.body,
+ channel as "channel: channel::Id",
+ sender as "sender: login::Id",
+ id as "id: Id",
+ body,
sent_at as "sent_at: DateTime",
sent_sequence as "sent_sequence: Sequence"
from message
- join channel on message.channel = channel.id
- join login as sender on message.sender = sender.id
- where message.id = $1
+ where id = $1
"#,
message,
)
@@ -143,14 +168,8 @@ impl<'c> Messages<'c> {
at: row.sent_at,
sequence: row.sent_sequence,
},
- channel: Channel {
- id: row.channel_id,
- name: row.channel_name,
- },
- sender: Login {
- id: row.sender_id,
- name: row.sender_name,
- },
+ channel: row.channel,
+ sender: row.sender,
id: row.id,
body: row.body,
},
@@ -207,17 +226,13 @@ impl<'c> Messages<'c> {
let messages = sqlx::query!(
r#"
select
- channel.id as "channel_id: channel::Id",
- channel.name as "channel_name",
- sender.id as "sender_id: login::Id",
- sender.name as "sender_name",
- message.id as "id: Id",
- message.body,
+ channel as "channel: channel::Id",
+ sender as "sender: login::Id",
+ id as "id: Id",
+ body,
sent_at as "sent_at: DateTime",
sent_sequence as "sent_sequence: Sequence"
from message
- join channel on message.channel = channel.id
- join login as sender on message.sender = sender.id
where coalesce(message.sent_sequence > $1, true)
"#,
resume_at,
@@ -228,14 +243,8 @@ impl<'c> Messages<'c> {
at: row.sent_at,
sequence: row.sent_sequence,
},
- channel: Channel {
- id: row.channel_id,
- name: row.channel_name,
- },
- sender: Login {
- id: row.sender_id,
- name: row.sender_name,
- },
+ channel: row.channel,
+ sender: row.sender,
id: row.id,
body: row.body,
},
diff --git a/src/message/snapshot.rs b/src/message/snapshot.rs
index 522c1aa..0eb37bb 100644
--- a/src/message/snapshot.rs
+++ b/src/message/snapshot.rs
@@ -1,57 +1,24 @@
use super::{
- event::{Event, Kind, Sent},
+ event::{Event, Sent},
Id,
};
-use crate::{channel::Channel, event::Instant, login::Login};
+use crate::{channel, event::Instant, login};
#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
-#[serde(into = "self::serialize::Message")]
pub struct Message {
- #[serde(skip)]
+ #[serde(flatten)]
pub sent: Instant,
- pub channel: Channel,
- pub sender: Login,
+ pub channel: channel::Id,
+ pub sender: login::Id,
pub id: Id,
pub body: String,
}
-mod serialize {
- use crate::{channel::Channel, login::Login, message::Id};
-
- #[derive(serde::Serialize)]
- pub struct Message {
- channel: Channel,
- sender: Login,
- #[allow(clippy::struct_field_names)]
- // Deliberately redundant with the module path; this produces a specific serialization.
- message: MessageData,
- }
-
- #[derive(serde::Serialize)]
- pub struct MessageData {
- id: Id,
- body: String,
- }
-
- impl From<super::Message> for Message {
- fn from(message: super::Message) -> Self {
- Self {
- channel: message.channel,
- sender: message.sender,
- message: MessageData {
- id: message.id,
- body: message.body,
- },
- }
- }
- }
-}
-
impl Message {
fn apply(state: Option<Self>, event: Event) -> Option<Self> {
- match (state, event.kind) {
- (None, Kind::Sent(event)) => Some(event.into()),
- (Some(message), Kind::Deleted(event)) if message.id == event.message => None,
+ match (state, event) {
+ (None, Event::Sent(event)) => Some(event.into()),
+ (Some(message), Event::Deleted(event)) if message.id == event.id => None,
(state, event) => panic!("invalid message event {event:#?} for state {state:#?}"),
}
}
diff --git a/src/test/fixtures/event.rs b/src/test/fixtures/event.rs
index 09f0490..7fe2bf3 100644
--- a/src/test/fixtures/event.rs
+++ b/src/test/fixtures/event.rs
@@ -1,11 +1,11 @@
use crate::{
- event::{Event, Kind},
- message::Message,
+ event::Event,
+ message::{Event::Sent, Message},
};
pub fn message_sent(event: &Event, message: &Message) -> bool {
matches!(
- &event.kind,
- Kind::MessageSent(event) if message == &event.into()
+ &event,
+ Event::Message(Sent(event)) if message == &event.into()
)
}
diff --git a/src/test/fixtures/filter.rs b/src/test/fixtures/filter.rs
index 6e62aea..84d27b0 100644
--- a/src/test/fixtures/filter.rs
+++ b/src/test/fixtures/filter.rs
@@ -1,11 +1,11 @@
use futures::future;
-use crate::event::{Event, Kind};
+use crate::{channel::Event::Created, event::Event, message::Event::Sent};
pub fn messages() -> impl FnMut(&Event) -> future::Ready<bool> {
- |event| future::ready(matches!(event.kind, Kind::MessageSent(_)))
+ |event| future::ready(matches!(event, Event::Message(Sent(_))))
}
pub fn created() -> impl FnMut(&Event) -> future::Ready<bool> {
- |event| future::ready(matches!(event.kind, Kind::ChannelCreated(_)))
+ |event| future::ready(matches!(event, Event::Channel(Created(_))))
}
diff --git a/src/test/fixtures/login.rs b/src/test/fixtures/login.rs
index 00c2789..e5ac716 100644
--- a/src/test/fixtures/login.rs
+++ b/src/test/fixtures/login.rs
@@ -3,23 +3,24 @@ use uuid::Uuid;
use crate::{
app::App,
+ clock::RequestedAt,
login::{self, Login, Password},
};
-pub async fn create_with_password(app: &App) -> (String, Password) {
+pub async fn create_with_password(app: &App, created_at: &RequestedAt) -> (String, Password) {
let (name, password) = propose();
app.logins()
- .create(&name, &password)
+ .create(&name, &password, created_at)
.await
.expect("should always succeed if the login is actually new");
(name, password)
}
-pub async fn create(app: &App) -> Login {
+pub async fn create(app: &App, created_at: &RequestedAt) -> Login {
let (name, password) = propose();
app.logins()
- .create(&name, &password)
+ .create(&name, &password, created_at)
.await
.expect("should always succeed if the login is actually new")
}
diff --git a/src/token/app.rs b/src/token/app.rs
index 5c4fcd5..b8af637 100644
--- a/src/token/app.rs
+++ b/src/token/app.rs
@@ -7,23 +7,34 @@ use futures::{
use sqlx::sqlite::SqlitePool;
use super::{
- broadcaster::Broadcaster, event, repo::auth::Provider as _, repo::Provider as _, Id, Secret,
+ repo::auth::Provider as _, repo::Provider as _, Broadcaster, Event as TokenEvent, Id, Secret,
};
use crate::{
clock::DateTime,
db::NotFound as _,
+ event::{self, repo::Provider as _, Event as ServiceEvent},
login::{repo::Provider as _, Login, Password},
};
pub struct Tokens<'a> {
db: &'a SqlitePool,
- tokens: &'a Broadcaster,
+ events: &'a event::Broadcaster,
+ token_events: &'a Broadcaster,
}
impl<'a> Tokens<'a> {
- pub const fn new(db: &'a SqlitePool, tokens: &'a Broadcaster) -> Self {
- Self { db, tokens }
+ pub const fn new(
+ db: &'a SqlitePool,
+ events: &'a event::Broadcaster,
+ token_events: &'a Broadcaster,
+ ) -> Self {
+ Self {
+ db,
+ events,
+ token_events,
+ }
}
+
pub async fn login(
&self,
name: &str,
@@ -32,22 +43,30 @@ impl<'a> Tokens<'a> {
) -> Result<Secret, LoginError> {
let mut tx = self.db.begin().await?;
- let login = if let Some((login, stored_hash)) = tx.auth().for_name(name).await? {
+ let (login, created) = if let Some((login, stored_hash)) = tx.auth().for_name(name).await? {
if stored_hash.verify(password)? {
- // Password verified; use the login.
- login
+ // Password verified, proceed with login
+ (login, false)
} else {
// Password NOT verified.
return Err(LoginError::Rejected);
}
} else {
let password_hash = password.hash()?;
- tx.logins().create(name, &password_hash).await?
+ let created = tx.sequence().next(login_at).await?;
+ let login = tx.logins().create(name, &password_hash, &created).await?;
+
+ (login, true)
};
let token = tx.tokens().issue(&login, login_at).await?;
tx.commit().await?;
+ if created {
+ self.events
+ .broadcast(login.events().map(ServiceEvent::from).collect::<Vec<_>>());
+ }
+
Ok(token)
}
@@ -76,7 +95,7 @@ impl<'a> Tokens<'a> {
E: std::fmt::Debug,
{
// Subscribe, first.
- let token_events = self.tokens.subscribe();
+ let token_events = self.token_events.subscribe();
// Check that the token is valid at this point in time, second. If it is, then
// any future revocations will appear in the subscription. If not, bail now.
@@ -102,7 +121,9 @@ impl<'a> Tokens<'a> {
// Then construct the guarded stream. First, project both streams into
// `GuardedEvent`.
let token_events = token_events
- .filter(move |event| future::ready(event.token == token))
+ .filter(move |event| {
+ future::ready(matches!(event, TokenEvent::Revoked(id) if id == &token))
+ })
.map(|_| GuardedEvent::TokenRevoked);
let events = events.map(|event| GuardedEvent::Event(event));
@@ -126,8 +147,8 @@ impl<'a> Tokens<'a> {
let tokens = tx.tokens().expire(&expire_at).await?;
tx.commit().await?;
- for event in tokens.into_iter().map(event::TokenRevoked::from) {
- self.tokens.broadcast(event);
+ for event in tokens.into_iter().map(TokenEvent::Revoked) {
+ self.token_events.broadcast(event);
}
Ok(())
@@ -138,8 +159,8 @@ impl<'a> Tokens<'a> {
tx.tokens().revoke(token).await?;
tx.commit().await?;
- self.tokens
- .broadcast(event::TokenRevoked::from(token.clone()));
+ self.token_events
+ .broadcast(TokenEvent::Revoked(token.clone()));
Ok(())
}
diff --git a/src/token/broadcaster.rs b/src/token/broadcaster.rs
index 8e2e006..de2513a 100644
--- a/src/token/broadcaster.rs
+++ b/src/token/broadcaster.rs
@@ -1,4 +1,3 @@
-use super::event;
use crate::broadcast;
-pub type Broadcaster = broadcast::Broadcaster<event::TokenRevoked>;
+pub type Broadcaster = broadcast::Broadcaster<super::Event>;
diff --git a/src/token/event.rs b/src/token/event.rs
index d53d436..51b74d7 100644
--- a/src/token/event.rs
+++ b/src/token/event.rs
@@ -1,12 +1,6 @@
use crate::token;
#[derive(Clone, Debug)]
-pub struct TokenRevoked {
- pub token: token::Id,
-}
-
-impl From<token::Id> for TokenRevoked {
- fn from(token: token::Id) -> Self {
- Self { token }
- }
+pub enum Event {
+ Revoked(token::Id),
}
diff --git a/src/token/mod.rs b/src/token/mod.rs
index d122611..eccb3cd 100644
--- a/src/token/mod.rs
+++ b/src/token/mod.rs
@@ -1,9 +1,9 @@
pub mod app;
-pub mod broadcaster;
+mod broadcaster;
mod event;
pub mod extract;
mod id;
mod repo;
mod secret;
-pub use self::{id::Id, secret::Secret};
+pub use self::{broadcaster::Broadcaster, event::Event, id::Id, secret::Secret};
diff --git a/src/token/repo/auth.rs b/src/token/repo/auth.rs
index b299697..ddb5136 100644
--- a/src/token/repo/auth.rs
+++ b/src/token/repo/auth.rs
@@ -1,6 +1,10 @@
use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction};
-use crate::login::{self, password::StoredHash, Login};
+use crate::{
+ clock::DateTime,
+ event::{Instant, Sequence},
+ login::{self, password::StoredHash, History, Login},
+};
pub trait Provider {
fn auth(&mut self) -> Auth;
@@ -21,25 +25,33 @@ impl<'t> Auth<'t> {
pub async fn for_name(
&mut self,
name: &str,
- ) -> Result<Option<(Login, StoredHash)>, sqlx::Error> {
+ ) -> Result<Option<(History, StoredHash)>, sqlx::Error> {
let found = sqlx::query!(
r#"
select
id as "id: login::Id",
name,
- password_hash as "password_hash: StoredHash"
+ password_hash as "password_hash: StoredHash",
+ created_sequence as "created_sequence: Sequence",
+ created_at as "created_at: DateTime"
from login
where name = $1
"#,
name,
)
- .map(|rec| {
+ .map(|row| {
(
- Login {
- id: rec.id,
- name: rec.name,
+ History {
+ login: Login {
+ id: row.id,
+ name: row.name,
+ },
+ created: Instant {
+ at: row.created_at,
+ sequence: row.created_sequence,
+ },
},
- rec.password_hash,
+ row.password_hash,
)
})
.fetch_optional(&mut *self.0)
diff --git a/src/token/repo/token.rs b/src/token/repo/token.rs
index 5f64dac..c592dcd 100644
--- a/src/token/repo/token.rs
+++ b/src/token/repo/token.rs
@@ -3,7 +3,7 @@ use uuid::Uuid;
use crate::{
clock::DateTime,
- login::{self, Login},
+ login::{self, History, Login},
token::{Id, Secret},
};
@@ -24,11 +24,12 @@ impl<'c> Tokens<'c> {
// be used to control expiry, until the token is actually used.
pub async fn issue(
&mut self,
- login: &Login,
+ login: &History,
issued_at: &DateTime,
) -> Result<Secret, sqlx::Error> {
let id = Id::generate();
let secret = Uuid::new_v4().to_string();
+ let login = login.id();
let secret = sqlx::query_scalar!(
r#"
@@ -39,7 +40,7 @@ impl<'c> Tokens<'c> {
"#,
id,
secret,
- login.id,
+ login,
issued_at,
)
.fetch_one(&mut *self.0)
@@ -127,7 +128,7 @@ impl<'c> Tokens<'c> {
select
token.id as "token_id: Id",
login.id as "login_id: login::Id",
- name as "login_name"
+ login.name as "login_name"
from login
join token on login.id = token.login
where token.secret = $1