From 5af4aea1e2f143499529b70f9cf191c6994265c6 Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Mon, 23 Jun 2025 21:28:36 -0400 Subject: Rename "channel" to "conversation" in the database. I've - somewhat arbitrarily - started renaming column aliases, as well, though the corresponding Rust data model, API fields and nouns, and client code still references them as "channel" (or as derived terms). As with so many schema changes, this entails a complete rebuild of a substantial portion of the schema. sqlite3 still doesn't have very many `alter table` primitives, for renaming columns in particular. --- src/channel/repo.rs | 75 ++++++++++++++++++++++++++--------------------------- src/message/repo.rs | 28 ++++++++++---------- 2 files changed, 51 insertions(+), 52 deletions(-) (limited to 'src') diff --git a/src/channel/repo.rs b/src/channel/repo.rs index 812a259..fd2173a 100644 --- a/src/channel/repo.rs +++ b/src/channel/repo.rs @@ -31,8 +31,7 @@ impl Channels<'_> { sqlx::query!( r#" - insert - into channel (id, created_at, created_sequence, last_sequence) + insert into conversation (id, created_at, created_sequence, last_sequence) values ($1, $2, $3, $4) "#, id, @@ -45,7 +44,7 @@ impl Channels<'_> { sqlx::query!( r#" - insert into channel_name (id, display_name, canonical_name) + insert into conversation_name (id, display_name, canonical_name) values ($1, $2, $3) "#, id, @@ -75,14 +74,14 @@ impl Channels<'_> { id as "id: Id", name.display_name as "display_name?: String", name.canonical_name as "canonical_name?: String", - channel.created_at as "created_at: DateTime", - channel.created_sequence as "created_sequence: Sequence", + conversation.created_at as "created_at: DateTime", + conversation.created_sequence as "created_sequence: Sequence", deleted.deleted_at as "deleted_at?: DateTime", deleted.deleted_sequence as "deleted_sequence?: Sequence" - from channel - left join channel_name as name + from conversation + left join conversation_name as name using (id) - left join channel_deleted as deleted + left join conversation_deleted as deleted using (id) where id = $1 "#, @@ -112,16 +111,16 @@ impl Channels<'_> { id as "id: Id", name.display_name as "display_name?: String", name.canonical_name as "canonical_name?: String", - channel.created_at as "created_at: DateTime", - channel.created_sequence as "created_sequence: Sequence", + conversation.created_at as "created_at: DateTime", + conversation.created_sequence as "created_sequence: Sequence", deleted.deleted_at as "deleted_at?: DateTime", deleted.deleted_sequence as "deleted_sequence?: Sequence" - from channel - left join channel_name as name + from conversation + left join conversation_name as name using (id) - left join channel_deleted as deleted + left join conversation_deleted as deleted using (id) - where channel.created_sequence <= $1 + where conversation.created_sequence <= $1 order by name.canonical_name "#, resume_at, @@ -152,16 +151,16 @@ impl Channels<'_> { id as "id: Id", name.display_name as "display_name?: String", name.canonical_name as "canonical_name?: String", - channel.created_at as "created_at: DateTime", - channel.created_sequence as "created_sequence: Sequence", + conversation.created_at as "created_at: DateTime", + conversation.created_sequence as "created_sequence: Sequence", deleted.deleted_at as "deleted_at?: DateTime", deleted.deleted_sequence as "deleted_sequence?: Sequence" - from channel - left join channel_name as name + from conversation + left join conversation_name as name using (id) - left join channel_deleted as deleted + left join conversation_deleted as deleted using (id) - where channel.last_sequence > $1 + where conversation.last_sequence > $1 "#, resume_at, ) @@ -192,7 +191,7 @@ impl Channels<'_> { let id = channel.id(); sqlx::query!( r#" - update channel + update conversation set last_sequence = max(last_sequence, $1) where id = $2 returning id as "id: Id" @@ -205,7 +204,7 @@ impl Channels<'_> { sqlx::query!( r#" - insert into channel_deleted (id, deleted_at, deleted_sequence) + insert into conversation_deleted (id, deleted_at, deleted_sequence) values ($1, $2, $3) "#, id, @@ -215,9 +214,9 @@ impl Channels<'_> { .execute(&mut *self.0) .await?; - // Small social responsibility hack here: when a channel is deleted, its name is - // retconned to have been the empty string. Someone reading the event stream - // afterwards, or looking at channels via the API, cannot retrieve the + // Small social responsibility hack here: when a conversation is deleted, its + // name is retconned to have been the empty string. Someone reading the event + // stream afterwards, or looking at channels via the API, cannot retrieve the // "deleted" channel's information by ignoring the deletion event. // // This also avoids the need for a separate name reservation table to ensure @@ -225,7 +224,7 @@ impl Channels<'_> { // is unique over non-null values. sqlx::query!( r#" - delete from channel_name + delete from conversation_name where id = $1 "#, id, @@ -242,11 +241,11 @@ impl Channels<'_> { let channels = sqlx::query_scalar!( r#" with has_messages as ( - select channel + select conversation from message - group by channel + group by conversation ) - delete from channel_deleted + delete from conversation_deleted where deleted_at < $1 and id not in has_messages returning id as "id: Id" @@ -260,7 +259,7 @@ impl Channels<'_> { // Wanted: a way to batch these up into one query. sqlx::query!( r#" - delete from channel + delete from conversation where id = $1 "#, channel, @@ -276,21 +275,21 @@ impl Channels<'_> { let channels = sqlx::query!( r#" select - channel.id as "id: Id", + conversation.id as "id: Id", name.display_name as "display_name?: String", name.canonical_name as "canonical_name?: String", - channel.created_at as "created_at: DateTime", - channel.created_sequence as "created_sequence: Sequence", + conversation.created_at as "created_at: DateTime", + conversation.created_sequence as "created_sequence: Sequence", deleted.deleted_at as "deleted_at?: DateTime", deleted.deleted_sequence as "deleted_sequence?: Sequence" - from channel - left join channel_name as name + from conversation + left join conversation_name as name using (id) - left join channel_deleted as deleted + left join conversation_deleted as deleted using (id) left join message - on channel.id = message.channel - where channel.created_at < $1 + on conversation.id = message.conversation + where conversation.created_at < $1 and message.id is null and deleted.id is null "#, diff --git a/src/message/repo.rs b/src/message/repo.rs index e753134..159ce8e 100644 --- a/src/message/repo.rs +++ b/src/message/repo.rs @@ -33,11 +33,11 @@ impl Messages<'_> { let message = sqlx::query!( r#" insert into message - (id, channel, sender, sent_at, sent_sequence, body, last_sequence) + (id, conversation, sender, sent_at, sent_sequence, body, last_sequence) values ($1, $2, $3, $4, $5, $6, $7) returning id as "id: Id", - channel as "channel: channel::Id", + conversation as "conversation: channel::Id", sender as "sender: user::Id", sent_at as "sent_at: DateTime", sent_sequence as "sent_sequence: Sequence", @@ -54,7 +54,7 @@ impl Messages<'_> { .map(|row| History { message: Message { sent: Instant::new(row.sent_at, row.sent_sequence), - channel: row.channel, + channel: row.conversation, sender: row.sender, id: row.id, body: row.body.unwrap_or_default(), @@ -73,7 +73,7 @@ impl Messages<'_> { let messages = sqlx::query!( r#" select - message.channel as "channel: channel::Id", + message.conversation as "conversation: channel::Id", message.sender as "sender: user::Id", id as "id: Id", message.body as "body: Body", @@ -84,7 +84,7 @@ impl Messages<'_> { from message left join message_deleted as deleted using (id) - where message.channel = $1 + where message.conversation = $1 and deleted.id is null "#, channel_id, @@ -92,7 +92,7 @@ impl Messages<'_> { .map(|row| History { message: Message { sent: Instant::new(row.sent_at, row.sent_sequence), - channel: row.channel, + channel: row.conversation, sender: row.sender, id: row.id, body: row.body.unwrap_or_default(), @@ -110,7 +110,7 @@ impl Messages<'_> { let messages = sqlx::query!( r#" select - message.channel as "channel: channel::Id", + message.conversation as "conversation: channel::Id", message.sender as "sender: user::Id", message.id as "id: Id", message.body as "body: Body", @@ -129,7 +129,7 @@ impl Messages<'_> { .map(|row| History { message: Message { sent: Instant::new(row.sent_at, row.sent_sequence), - channel: row.channel, + channel: row.conversation, sender: row.sender, id: row.id, body: row.body.unwrap_or_default(), @@ -147,7 +147,7 @@ impl Messages<'_> { let message = sqlx::query!( r#" select - message.channel as "channel: channel::Id", + message.conversation as "conversation: channel::Id", message.sender as "sender: user::Id", id as "id: Id", message.body as "body: Body", @@ -165,7 +165,7 @@ impl Messages<'_> { .map(|row| History { message: Message { sent: Instant::new(row.sent_at, row.sent_sequence), - channel: row.channel, + channel: row.conversation, sender: row.sender, id: row.id, body: row.body.unwrap_or_default(), @@ -252,7 +252,7 @@ impl Messages<'_> { r#" select id as "id: Id", - message.channel as "channel: channel::Id", + message.conversation as "conversation: channel::Id", message.sender as "sender: user::Id", message.sent_at as "sent_at: DateTime", message.sent_sequence as "sent_sequence: Sequence", @@ -271,7 +271,7 @@ impl Messages<'_> { message: Message { sent: Instant::new(row.sent_at, row.sent_sequence), id: row.id, - channel: row.channel, + channel: row.conversation, sender: row.sender, body: row.body.unwrap_or_default(), deleted_at: row.deleted_at, @@ -289,7 +289,7 @@ impl Messages<'_> { r#" select id as "id: Id", - message.channel as "channel: channel::Id", + message.conversation as "conversation: channel::Id", message.sender as "sender: user::Id", message.sent_at as "sent_at: DateTime", message.sent_sequence as "sent_sequence: Sequence", @@ -306,7 +306,7 @@ impl Messages<'_> { .map(|row| History { message: Message { sent: Instant::new(row.sent_at, row.sent_sequence), - channel: row.channel, + channel: row.conversation, sender: row.sender, id: row.id, body: row.body.unwrap_or_default(), -- cgit v1.2.3 From a15e3d580124f561864c6a39f1e035eb1b3aab13 Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Mon, 30 Jun 2025 22:00:57 -0400 Subject: Rename "channel" to "conversation" within the server. I've split this from the schema and API changes because, frankly, it's huge. Annoyingly so. There are no semantic changes in this, it's all symbol changes, but there are a _lot_ of them because the term "channel" leaks all over everything in a service whose primary role is managing messages sent to channels (now, conversations). I found a buggy test while working on this! It's not fixed in this commit, because it felt mean to hide a real change in the middle of this much chaff. --- ...eb722b56858a0c2aa7eedf4cc5824ea61da504f2f7.json | 50 --- ...f750c807319b2c1d3069f9186ccaf722904310cf3d.json | 62 ---- ...7723b166dc49bfc1449fc3551faa376e303136a988.json | 62 ++++ ...d3c9d359549114abf305116ecbc046d53ea4382966.json | 62 ++++ ...d980b7f8cbc8a8339377814ca5f889860da99b1561.json | 50 +++ ...43914eceed46ca25ec1d9c91127c4134ccbd471aa0.json | 62 ++++ ...e3c07e457b5b21805257cd117f7d1833491ee41312.json | 62 ++++ ...665e297e9ff65546b87e6667ad68bbc162a3e464c4.json | 62 ---- ...f69141a2e9dd3efaa0f1911765e73d1c00b34ddbaa.json | 62 ---- ...487bd9d1179bbf8c33398ee7b9af2ea265c4b6539c.json | 62 ---- ...5f016148eb69fd5ee077567bbc0dd49acf69a3b2b2.json | 62 ++++ ...bff7f0c48a70d61597e2f284bf29771a9c3c2615a9.json | 62 ---- src/app.rs | 6 +- src/boot/app.rs | 16 +- src/boot/handlers/boot/test.rs | 81 ++--- src/channel/app.rs | 224 -------------- src/channel/event.rs | 46 --- src/channel/handlers/create/mod.rs | 67 ---- src/channel/handlers/create/test.rs | 250 --------------- src/channel/handlers/delete/mod.rs | 59 ---- src/channel/handlers/delete/test.rs | 184 ----------- src/channel/handlers/mod.rs | 9 - src/channel/handlers/send/mod.rs | 63 ---- src/channel/handlers/send/test.rs | 130 -------- src/channel/history.rs | 69 ----- src/channel/id.rs | 38 --- src/channel/mod.rs | 10 - src/channel/repo.rs | 336 --------------------- src/channel/snapshot.rs | 43 --- src/channel/validate.rs | 25 -- src/conversation/app.rs | 236 +++++++++++++++ src/conversation/event.rs | 46 +++ src/conversation/handlers/create/mod.rs | 67 ++++ src/conversation/handlers/create/test.rs | 250 +++++++++++++++ src/conversation/handlers/delete/mod.rs | 61 ++++ src/conversation/handlers/delete/test.rs | 184 +++++++++++ src/conversation/handlers/mod.rs | 9 + src/conversation/handlers/send/mod.rs | 63 ++++ src/conversation/handlers/send/test.rs | 130 ++++++++ src/conversation/history.rs | 69 +++++ src/conversation/id.rs | 38 +++ src/conversation/mod.rs | 10 + src/conversation/repo.rs | 332 ++++++++++++++++++++ src/conversation/snapshot.rs | 43 +++ src/conversation/validate.rs | 25 ++ src/event/app.rs | 16 +- src/event/handlers/stream/test/channel.rs | 273 ----------------- src/event/handlers/stream/test/conversation.rs | 273 +++++++++++++++++ src/event/handlers/stream/test/message.rs | 65 ++-- src/event/handlers/stream/test/mod.rs | 2 +- src/event/handlers/stream/test/resume.rs | 43 +-- src/event/handlers/stream/test/token.rs | 24 +- src/event/mod.rs | 8 +- src/expire.rs | 6 +- src/lib.rs | 2 +- src/message/app.rs | 36 ++- src/message/handlers/delete/test.rs | 21 +- src/message/repo.rs | 29 +- src/message/snapshot.rs | 4 +- src/routes.rs | 16 +- src/test/fixtures/channel.rs | 38 --- src/test/fixtures/conversation.rs | 38 +++ src/test/fixtures/event/mod.rs | 6 +- src/test/fixtures/event/stream.rs | 14 +- src/test/fixtures/message.rs | 13 +- src/test/fixtures/mod.rs | 2 +- src/ui/handlers/channel.rs | 58 ---- src/ui/handlers/conversation.rs | 61 ++++ src/ui/handlers/mod.rs | 4 +- 69 files changed, 2515 insertions(+), 2476 deletions(-) delete mode 100644 .sqlx/query-008401c1a1988a925d76aeeb722b56858a0c2aa7eedf4cc5824ea61da504f2f7.json delete mode 100644 .sqlx/query-0795c8c7f01f67cc05e1d0f750c807319b2c1d3069f9186ccaf722904310cf3d.json create mode 100644 .sqlx/query-3aa6bb5450cfc6b4b1acfc7723b166dc49bfc1449fc3551faa376e303136a988.json create mode 100644 .sqlx/query-3ea2e910657ae0045eba8ed3c9d359549114abf305116ecbc046d53ea4382966.json create mode 100644 .sqlx/query-427a530f68282ba586c1e2d980b7f8cbc8a8339377814ca5f889860da99b1561.json create mode 100644 .sqlx/query-508cc72b2bdc712e66c43643914eceed46ca25ec1d9c91127c4134ccbd471aa0.json create mode 100644 .sqlx/query-5d25f783816c1b10750e2de3c07e457b5b21805257cd117f7d1833491ee41312.json delete mode 100644 .sqlx/query-a781f6b9ab38ade2a0b15d665e297e9ff65546b87e6667ad68bbc162a3e464c4.json delete mode 100644 .sqlx/query-c4f0bbdcfdd1d88def0f19f69141a2e9dd3efaa0f1911765e73d1c00b34ddbaa.json delete mode 100644 .sqlx/query-e9689d49d357b1df287bc2487bd9d1179bbf8c33398ee7b9af2ea265c4b6539c.json create mode 100644 .sqlx/query-f13957b52b93cf6a1c1bd85f016148eb69fd5ee077567bbc0dd49acf69a3b2b2.json delete mode 100644 .sqlx/query-f796c7c4d16a7bda355c5cbff7f0c48a70d61597e2f284bf29771a9c3c2615a9.json delete mode 100644 src/channel/app.rs delete mode 100644 src/channel/event.rs delete mode 100644 src/channel/handlers/create/mod.rs delete mode 100644 src/channel/handlers/create/test.rs delete mode 100644 src/channel/handlers/delete/mod.rs delete mode 100644 src/channel/handlers/delete/test.rs delete mode 100644 src/channel/handlers/mod.rs delete mode 100644 src/channel/handlers/send/mod.rs delete mode 100644 src/channel/handlers/send/test.rs delete mode 100644 src/channel/history.rs delete mode 100644 src/channel/id.rs delete mode 100644 src/channel/mod.rs delete mode 100644 src/channel/repo.rs delete mode 100644 src/channel/snapshot.rs delete mode 100644 src/channel/validate.rs create mode 100644 src/conversation/app.rs create mode 100644 src/conversation/event.rs create mode 100644 src/conversation/handlers/create/mod.rs create mode 100644 src/conversation/handlers/create/test.rs create mode 100644 src/conversation/handlers/delete/mod.rs create mode 100644 src/conversation/handlers/delete/test.rs create mode 100644 src/conversation/handlers/mod.rs create mode 100644 src/conversation/handlers/send/mod.rs create mode 100644 src/conversation/handlers/send/test.rs create mode 100644 src/conversation/history.rs create mode 100644 src/conversation/id.rs create mode 100644 src/conversation/mod.rs create mode 100644 src/conversation/repo.rs create mode 100644 src/conversation/snapshot.rs create mode 100644 src/conversation/validate.rs delete mode 100644 src/event/handlers/stream/test/channel.rs create mode 100644 src/event/handlers/stream/test/conversation.rs delete mode 100644 src/test/fixtures/channel.rs create mode 100644 src/test/fixtures/conversation.rs delete mode 100644 src/ui/handlers/channel.rs create mode 100644 src/ui/handlers/conversation.rs (limited to 'src') diff --git a/.sqlx/query-008401c1a1988a925d76aeeb722b56858a0c2aa7eedf4cc5824ea61da504f2f7.json b/.sqlx/query-008401c1a1988a925d76aeeb722b56858a0c2aa7eedf4cc5824ea61da504f2f7.json deleted file mode 100644 index f80d588..0000000 --- a/.sqlx/query-008401c1a1988a925d76aeeb722b56858a0c2aa7eedf4cc5824ea61da504f2f7.json +++ /dev/null @@ -1,50 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n insert into message\n (id, conversation, sender, sent_at, sent_sequence, body, last_sequence)\n values ($1, $2, $3, $4, $5, $6, $7)\n returning\n id as \"id: Id\",\n conversation as \"conversation: channel::Id\",\n sender as \"sender: user::Id\",\n sent_at as \"sent_at: DateTime\",\n sent_sequence as \"sent_sequence: Sequence\",\n body as \"body: Body\"\n ", - "describe": { - "columns": [ - { - "name": "id: Id", - "ordinal": 0, - "type_info": "Text" - }, - { - "name": "conversation: channel::Id", - "ordinal": 1, - "type_info": "Text" - }, - { - "name": "sender: user::Id", - "ordinal": 2, - "type_info": "Text" - }, - { - "name": "sent_at: DateTime", - "ordinal": 3, - "type_info": "Text" - }, - { - "name": "sent_sequence: Sequence", - "ordinal": 4, - "type_info": "Integer" - }, - { - "name": "body: Body", - "ordinal": 5, - "type_info": "Text" - } - ], - "parameters": { - "Right": 7 - }, - "nullable": [ - false, - false, - false, - false, - false, - true - ] - }, - "hash": "008401c1a1988a925d76aeeb722b56858a0c2aa7eedf4cc5824ea61da504f2f7" -} diff --git a/.sqlx/query-0795c8c7f01f67cc05e1d0f750c807319b2c1d3069f9186ccaf722904310cf3d.json b/.sqlx/query-0795c8c7f01f67cc05e1d0f750c807319b2c1d3069f9186ccaf722904310cf3d.json deleted file mode 100644 index 20d217e..0000000 --- a/.sqlx/query-0795c8c7f01f67cc05e1d0f750c807319b2c1d3069f9186ccaf722904310cf3d.json +++ /dev/null @@ -1,62 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n select\n id as \"id: Id\",\n message.conversation as \"conversation: channel::Id\",\n message.sender as \"sender: user::Id\",\n message.sent_at as \"sent_at: DateTime\",\n message.sent_sequence as \"sent_sequence: Sequence\",\n message.body as \"body: Body\",\n deleted.deleted_at as \"deleted_at?: DateTime\",\n deleted.deleted_sequence as \"deleted_sequence?: Sequence\"\n from message\n left join message_deleted as deleted\n using (id)\n where message.last_sequence > $1\n ", - "describe": { - "columns": [ - { - "name": "id: Id", - "ordinal": 0, - "type_info": "Text" - }, - { - "name": "conversation: channel::Id", - "ordinal": 1, - "type_info": "Text" - }, - { - "name": "sender: user::Id", - "ordinal": 2, - "type_info": "Text" - }, - { - "name": "sent_at: DateTime", - "ordinal": 3, - "type_info": "Text" - }, - { - "name": "sent_sequence: Sequence", - "ordinal": 4, - "type_info": "Integer" - }, - { - "name": "body: Body", - "ordinal": 5, - "type_info": "Text" - }, - { - "name": "deleted_at?: DateTime", - "ordinal": 6, - "type_info": "Text" - }, - { - "name": "deleted_sequence?: Sequence", - "ordinal": 7, - "type_info": "Integer" - } - ], - "parameters": { - "Right": 1 - }, - "nullable": [ - false, - false, - false, - false, - false, - true, - false, - false - ] - }, - "hash": "0795c8c7f01f67cc05e1d0f750c807319b2c1d3069f9186ccaf722904310cf3d" -} diff --git a/.sqlx/query-3aa6bb5450cfc6b4b1acfc7723b166dc49bfc1449fc3551faa376e303136a988.json b/.sqlx/query-3aa6bb5450cfc6b4b1acfc7723b166dc49bfc1449fc3551faa376e303136a988.json new file mode 100644 index 0000000..bcbb09c --- /dev/null +++ b/.sqlx/query-3aa6bb5450cfc6b4b1acfc7723b166dc49bfc1449fc3551faa376e303136a988.json @@ -0,0 +1,62 @@ +{ + "db_name": "SQLite", + "query": "\n select\n id as \"id: Id\",\n message.conversation as \"conversation: conversation::Id\",\n message.sender as \"sender: user::Id\",\n message.sent_at as \"sent_at: DateTime\",\n message.sent_sequence as \"sent_sequence: Sequence\",\n message.body as \"body: Body\",\n deleted.deleted_at as \"deleted_at?: DateTime\",\n deleted.deleted_sequence as \"deleted_sequence?: Sequence\"\n from message\n left join message_deleted as deleted\n using (id)\n where message.last_sequence > $1\n ", + "describe": { + "columns": [ + { + "name": "id: Id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "conversation: conversation::Id", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "sender: user::Id", + "ordinal": 2, + "type_info": "Text" + }, + { + "name": "sent_at: DateTime", + "ordinal": 3, + "type_info": "Text" + }, + { + "name": "sent_sequence: Sequence", + "ordinal": 4, + "type_info": "Integer" + }, + { + "name": "body: Body", + "ordinal": 5, + "type_info": "Text" + }, + { + "name": "deleted_at?: DateTime", + "ordinal": 6, + "type_info": "Text" + }, + { + "name": "deleted_sequence?: Sequence", + "ordinal": 7, + "type_info": "Integer" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false, + false, + false, + false, + false, + true, + false, + false + ] + }, + "hash": "3aa6bb5450cfc6b4b1acfc7723b166dc49bfc1449fc3551faa376e303136a988" +} diff --git a/.sqlx/query-3ea2e910657ae0045eba8ed3c9d359549114abf305116ecbc046d53ea4382966.json b/.sqlx/query-3ea2e910657ae0045eba8ed3c9d359549114abf305116ecbc046d53ea4382966.json new file mode 100644 index 0000000..cd2c808 --- /dev/null +++ b/.sqlx/query-3ea2e910657ae0045eba8ed3c9d359549114abf305116ecbc046d53ea4382966.json @@ -0,0 +1,62 @@ +{ + "db_name": "SQLite", + "query": "\n select\n message.conversation as \"conversation: conversation::Id\",\n message.sender as \"sender: user::Id\",\n id as \"id: Id\",\n message.body as \"body: Body\",\n message.sent_at as \"sent_at: DateTime\",\n message.sent_sequence as \"sent_sequence: Sequence\",\n deleted.deleted_at as \"deleted_at?: DateTime\",\n deleted.deleted_sequence as \"deleted_sequence?: Sequence\"\n from message\n left join message_deleted as deleted\n using (id)\n where message.conversation = $1\n and deleted.id is null\n ", + "describe": { + "columns": [ + { + "name": "conversation: conversation::Id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "sender: user::Id", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "id: Id", + "ordinal": 2, + "type_info": "Text" + }, + { + "name": "body: Body", + "ordinal": 3, + "type_info": "Text" + }, + { + "name": "sent_at: DateTime", + "ordinal": 4, + "type_info": "Text" + }, + { + "name": "sent_sequence: Sequence", + "ordinal": 5, + "type_info": "Integer" + }, + { + "name": "deleted_at?: DateTime", + "ordinal": 6, + "type_info": "Text" + }, + { + "name": "deleted_sequence?: Sequence", + "ordinal": 7, + "type_info": "Integer" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false, + false, + false, + true, + false, + false, + false, + false + ] + }, + "hash": "3ea2e910657ae0045eba8ed3c9d359549114abf305116ecbc046d53ea4382966" +} diff --git a/.sqlx/query-427a530f68282ba586c1e2d980b7f8cbc8a8339377814ca5f889860da99b1561.json b/.sqlx/query-427a530f68282ba586c1e2d980b7f8cbc8a8339377814ca5f889860da99b1561.json new file mode 100644 index 0000000..82db559 --- /dev/null +++ b/.sqlx/query-427a530f68282ba586c1e2d980b7f8cbc8a8339377814ca5f889860da99b1561.json @@ -0,0 +1,50 @@ +{ + "db_name": "SQLite", + "query": "\n insert into message\n (id, conversation, sender, sent_at, sent_sequence, body, last_sequence)\n values ($1, $2, $3, $4, $5, $6, $7)\n returning\n id as \"id: Id\",\n conversation as \"conversation: conversation::Id\",\n sender as \"sender: user::Id\",\n sent_at as \"sent_at: DateTime\",\n sent_sequence as \"sent_sequence: Sequence\",\n body as \"body: Body\"\n ", + "describe": { + "columns": [ + { + "name": "id: Id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "conversation: conversation::Id", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "sender: user::Id", + "ordinal": 2, + "type_info": "Text" + }, + { + "name": "sent_at: DateTime", + "ordinal": 3, + "type_info": "Text" + }, + { + "name": "sent_sequence: Sequence", + "ordinal": 4, + "type_info": "Integer" + }, + { + "name": "body: Body", + "ordinal": 5, + "type_info": "Text" + } + ], + "parameters": { + "Right": 7 + }, + "nullable": [ + false, + false, + false, + false, + false, + true + ] + }, + "hash": "427a530f68282ba586c1e2d980b7f8cbc8a8339377814ca5f889860da99b1561" +} diff --git a/.sqlx/query-508cc72b2bdc712e66c43643914eceed46ca25ec1d9c91127c4134ccbd471aa0.json b/.sqlx/query-508cc72b2bdc712e66c43643914eceed46ca25ec1d9c91127c4134ccbd471aa0.json new file mode 100644 index 0000000..a95038e --- /dev/null +++ b/.sqlx/query-508cc72b2bdc712e66c43643914eceed46ca25ec1d9c91127c4134ccbd471aa0.json @@ -0,0 +1,62 @@ +{ + "db_name": "SQLite", + "query": "\n select\n message.conversation as \"conversation: conversation::Id\",\n message.sender as \"sender: user::Id\",\n message.id as \"id: Id\",\n message.body as \"body: Body\",\n message.sent_at as \"sent_at: DateTime\",\n message.sent_sequence as \"sent_sequence: Sequence\",\n deleted.deleted_at as \"deleted_at?: DateTime\",\n deleted.deleted_sequence as \"deleted_sequence?: Sequence\"\n from message\n left join message_deleted as deleted\n using (id)\n where message.sent_sequence <= $1\n order by message.sent_sequence\n ", + "describe": { + "columns": [ + { + "name": "conversation: conversation::Id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "sender: user::Id", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "id: Id", + "ordinal": 2, + "type_info": "Text" + }, + { + "name": "body: Body", + "ordinal": 3, + "type_info": "Text" + }, + { + "name": "sent_at: DateTime", + "ordinal": 4, + "type_info": "Text" + }, + { + "name": "sent_sequence: Sequence", + "ordinal": 5, + "type_info": "Integer" + }, + { + "name": "deleted_at?: DateTime", + "ordinal": 6, + "type_info": "Text" + }, + { + "name": "deleted_sequence?: Sequence", + "ordinal": 7, + "type_info": "Integer" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false, + false, + false, + true, + false, + false, + false, + false + ] + }, + "hash": "508cc72b2bdc712e66c43643914eceed46ca25ec1d9c91127c4134ccbd471aa0" +} diff --git a/.sqlx/query-5d25f783816c1b10750e2de3c07e457b5b21805257cd117f7d1833491ee41312.json b/.sqlx/query-5d25f783816c1b10750e2de3c07e457b5b21805257cd117f7d1833491ee41312.json new file mode 100644 index 0000000..f2a9f36 --- /dev/null +++ b/.sqlx/query-5d25f783816c1b10750e2de3c07e457b5b21805257cd117f7d1833491ee41312.json @@ -0,0 +1,62 @@ +{ + "db_name": "SQLite", + "query": "\n select\n message.conversation as \"conversation: conversation::Id\",\n message.sender as \"sender: user::Id\",\n id as \"id: Id\",\n message.body as \"body: Body\",\n message.sent_at as \"sent_at: DateTime\",\n message.sent_sequence as \"sent_sequence: Sequence\",\n deleted.deleted_at as \"deleted_at?: DateTime\",\n deleted.deleted_sequence as \"deleted_sequence?: Sequence\"\n from message\n left join message_deleted as deleted\n using (id)\n where id = $1\n ", + "describe": { + "columns": [ + { + "name": "conversation: conversation::Id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "sender: user::Id", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "id: Id", + "ordinal": 2, + "type_info": "Text" + }, + { + "name": "body: Body", + "ordinal": 3, + "type_info": "Text" + }, + { + "name": "sent_at: DateTime", + "ordinal": 4, + "type_info": "Text" + }, + { + "name": "sent_sequence: Sequence", + "ordinal": 5, + "type_info": "Integer" + }, + { + "name": "deleted_at?: DateTime", + "ordinal": 6, + "type_info": "Text" + }, + { + "name": "deleted_sequence?: Sequence", + "ordinal": 7, + "type_info": "Integer" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false, + false, + false, + true, + false, + false, + false, + false + ] + }, + "hash": "5d25f783816c1b10750e2de3c07e457b5b21805257cd117f7d1833491ee41312" +} diff --git a/.sqlx/query-a781f6b9ab38ade2a0b15d665e297e9ff65546b87e6667ad68bbc162a3e464c4.json b/.sqlx/query-a781f6b9ab38ade2a0b15d665e297e9ff65546b87e6667ad68bbc162a3e464c4.json deleted file mode 100644 index 7e4a102..0000000 --- a/.sqlx/query-a781f6b9ab38ade2a0b15d665e297e9ff65546b87e6667ad68bbc162a3e464c4.json +++ /dev/null @@ -1,62 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n select\n message.conversation as \"conversation: channel::Id\",\n message.sender as \"sender: user::Id\",\n message.id as \"id: Id\",\n message.body as \"body: Body\",\n message.sent_at as \"sent_at: DateTime\",\n message.sent_sequence as \"sent_sequence: Sequence\",\n deleted.deleted_at as \"deleted_at?: DateTime\",\n deleted.deleted_sequence as \"deleted_sequence?: Sequence\"\n from message\n left join message_deleted as deleted\n using (id)\n where message.sent_sequence <= $1\n order by message.sent_sequence\n ", - "describe": { - "columns": [ - { - "name": "conversation: channel::Id", - "ordinal": 0, - "type_info": "Text" - }, - { - "name": "sender: user::Id", - "ordinal": 1, - "type_info": "Text" - }, - { - "name": "id: Id", - "ordinal": 2, - "type_info": "Text" - }, - { - "name": "body: Body", - "ordinal": 3, - "type_info": "Text" - }, - { - "name": "sent_at: DateTime", - "ordinal": 4, - "type_info": "Text" - }, - { - "name": "sent_sequence: Sequence", - "ordinal": 5, - "type_info": "Integer" - }, - { - "name": "deleted_at?: DateTime", - "ordinal": 6, - "type_info": "Text" - }, - { - "name": "deleted_sequence?: Sequence", - "ordinal": 7, - "type_info": "Integer" - } - ], - "parameters": { - "Right": 1 - }, - "nullable": [ - false, - false, - false, - true, - false, - false, - false, - false - ] - }, - "hash": "a781f6b9ab38ade2a0b15d665e297e9ff65546b87e6667ad68bbc162a3e464c4" -} diff --git a/.sqlx/query-c4f0bbdcfdd1d88def0f19f69141a2e9dd3efaa0f1911765e73d1c00b34ddbaa.json b/.sqlx/query-c4f0bbdcfdd1d88def0f19f69141a2e9dd3efaa0f1911765e73d1c00b34ddbaa.json deleted file mode 100644 index c4cb21b..0000000 --- a/.sqlx/query-c4f0bbdcfdd1d88def0f19f69141a2e9dd3efaa0f1911765e73d1c00b34ddbaa.json +++ /dev/null @@ -1,62 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n select\n message.conversation as \"conversation: channel::Id\",\n message.sender as \"sender: user::Id\",\n id as \"id: Id\",\n message.body as \"body: Body\",\n message.sent_at as \"sent_at: DateTime\",\n message.sent_sequence as \"sent_sequence: Sequence\",\n deleted.deleted_at as \"deleted_at?: DateTime\",\n deleted.deleted_sequence as \"deleted_sequence?: Sequence\"\n from message\n left join message_deleted as deleted\n using (id)\n where message.conversation = $1\n and deleted.id is null\n ", - "describe": { - "columns": [ - { - "name": "conversation: channel::Id", - "ordinal": 0, - "type_info": "Text" - }, - { - "name": "sender: user::Id", - "ordinal": 1, - "type_info": "Text" - }, - { - "name": "id: Id", - "ordinal": 2, - "type_info": "Text" - }, - { - "name": "body: Body", - "ordinal": 3, - "type_info": "Text" - }, - { - "name": "sent_at: DateTime", - "ordinal": 4, - "type_info": "Text" - }, - { - "name": "sent_sequence: Sequence", - "ordinal": 5, - "type_info": "Integer" - }, - { - "name": "deleted_at?: DateTime", - "ordinal": 6, - "type_info": "Text" - }, - { - "name": "deleted_sequence?: Sequence", - "ordinal": 7, - "type_info": "Integer" - } - ], - "parameters": { - "Right": 1 - }, - "nullable": [ - false, - false, - false, - true, - false, - false, - false, - false - ] - }, - "hash": "c4f0bbdcfdd1d88def0f19f69141a2e9dd3efaa0f1911765e73d1c00b34ddbaa" -} diff --git a/.sqlx/query-e9689d49d357b1df287bc2487bd9d1179bbf8c33398ee7b9af2ea265c4b6539c.json b/.sqlx/query-e9689d49d357b1df287bc2487bd9d1179bbf8c33398ee7b9af2ea265c4b6539c.json deleted file mode 100644 index 35dbbe3..0000000 --- a/.sqlx/query-e9689d49d357b1df287bc2487bd9d1179bbf8c33398ee7b9af2ea265c4b6539c.json +++ /dev/null @@ -1,62 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n select\n message.conversation as \"conversation: channel::Id\",\n message.sender as \"sender: user::Id\",\n id as \"id: Id\",\n message.body as \"body: Body\",\n message.sent_at as \"sent_at: DateTime\",\n message.sent_sequence as \"sent_sequence: Sequence\",\n deleted.deleted_at as \"deleted_at?: DateTime\",\n deleted.deleted_sequence as \"deleted_sequence?: Sequence\"\n from message\n left join message_deleted as deleted\n using (id)\n where id = $1\n ", - "describe": { - "columns": [ - { - "name": "conversation: channel::Id", - "ordinal": 0, - "type_info": "Text" - }, - { - "name": "sender: user::Id", - "ordinal": 1, - "type_info": "Text" - }, - { - "name": "id: Id", - "ordinal": 2, - "type_info": "Text" - }, - { - "name": "body: Body", - "ordinal": 3, - "type_info": "Text" - }, - { - "name": "sent_at: DateTime", - "ordinal": 4, - "type_info": "Text" - }, - { - "name": "sent_sequence: Sequence", - "ordinal": 5, - "type_info": "Integer" - }, - { - "name": "deleted_at?: DateTime", - "ordinal": 6, - "type_info": "Text" - }, - { - "name": "deleted_sequence?: Sequence", - "ordinal": 7, - "type_info": "Integer" - } - ], - "parameters": { - "Right": 1 - }, - "nullable": [ - false, - false, - false, - true, - false, - false, - false, - false - ] - }, - "hash": "e9689d49d357b1df287bc2487bd9d1179bbf8c33398ee7b9af2ea265c4b6539c" -} diff --git a/.sqlx/query-f13957b52b93cf6a1c1bd85f016148eb69fd5ee077567bbc0dd49acf69a3b2b2.json b/.sqlx/query-f13957b52b93cf6a1c1bd85f016148eb69fd5ee077567bbc0dd49acf69a3b2b2.json new file mode 100644 index 0000000..04298cc --- /dev/null +++ b/.sqlx/query-f13957b52b93cf6a1c1bd85f016148eb69fd5ee077567bbc0dd49acf69a3b2b2.json @@ -0,0 +1,62 @@ +{ + "db_name": "SQLite", + "query": "\n select\n id as \"id: Id\",\n message.conversation as \"conversation: conversation::Id\",\n message.sender as \"sender: user::Id\",\n message.sent_at as \"sent_at: DateTime\",\n message.sent_sequence as \"sent_sequence: Sequence\",\n message.body as \"body: Body\",\n deleted.deleted_at as \"deleted_at?: DateTime\",\n deleted.deleted_sequence as \"deleted_sequence?: Sequence\"\n from message\n left join message_deleted as deleted\n using (id)\n where message.sent_at < $1\n and deleted.id is null\n ", + "describe": { + "columns": [ + { + "name": "id: Id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "conversation: conversation::Id", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "sender: user::Id", + "ordinal": 2, + "type_info": "Text" + }, + { + "name": "sent_at: DateTime", + "ordinal": 3, + "type_info": "Text" + }, + { + "name": "sent_sequence: Sequence", + "ordinal": 4, + "type_info": "Integer" + }, + { + "name": "body: Body", + "ordinal": 5, + "type_info": "Text" + }, + { + "name": "deleted_at?: DateTime", + "ordinal": 6, + "type_info": "Text" + }, + { + "name": "deleted_sequence?: Sequence", + "ordinal": 7, + "type_info": "Integer" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false, + false, + false, + false, + false, + true, + false, + false + ] + }, + "hash": "f13957b52b93cf6a1c1bd85f016148eb69fd5ee077567bbc0dd49acf69a3b2b2" +} diff --git a/.sqlx/query-f796c7c4d16a7bda355c5cbff7f0c48a70d61597e2f284bf29771a9c3c2615a9.json b/.sqlx/query-f796c7c4d16a7bda355c5cbff7f0c48a70d61597e2f284bf29771a9c3c2615a9.json deleted file mode 100644 index f194250..0000000 --- a/.sqlx/query-f796c7c4d16a7bda355c5cbff7f0c48a70d61597e2f284bf29771a9c3c2615a9.json +++ /dev/null @@ -1,62 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n select\n id as \"id: Id\",\n message.conversation as \"conversation: channel::Id\",\n message.sender as \"sender: user::Id\",\n message.sent_at as \"sent_at: DateTime\",\n message.sent_sequence as \"sent_sequence: Sequence\",\n message.body as \"body: Body\",\n deleted.deleted_at as \"deleted_at?: DateTime\",\n deleted.deleted_sequence as \"deleted_sequence?: Sequence\"\n from message\n left join message_deleted as deleted\n using (id)\n where message.sent_at < $1\n and deleted.id is null\n ", - "describe": { - "columns": [ - { - "name": "id: Id", - "ordinal": 0, - "type_info": "Text" - }, - { - "name": "conversation: channel::Id", - "ordinal": 1, - "type_info": "Text" - }, - { - "name": "sender: user::Id", - "ordinal": 2, - "type_info": "Text" - }, - { - "name": "sent_at: DateTime", - "ordinal": 3, - "type_info": "Text" - }, - { - "name": "sent_sequence: Sequence", - "ordinal": 4, - "type_info": "Integer" - }, - { - "name": "body: Body", - "ordinal": 5, - "type_info": "Text" - }, - { - "name": "deleted_at?: DateTime", - "ordinal": 6, - "type_info": "Text" - }, - { - "name": "deleted_sequence?: Sequence", - "ordinal": 7, - "type_info": "Integer" - } - ], - "parameters": { - "Right": 1 - }, - "nullable": [ - false, - false, - false, - false, - false, - true, - false, - false - ] - }, - "hash": "f796c7c4d16a7bda355c5cbff7f0c48a70d61597e2f284bf29771a9c3c2615a9" -} diff --git a/src/app.rs b/src/app.rs index b7e52a4..133ee04 100644 --- a/src/app.rs +++ b/src/app.rs @@ -2,7 +2,7 @@ use sqlx::sqlite::SqlitePool; use crate::{ boot::app::Boot, - channel::app::Channels, + conversation::app::Conversations, event::{self, app::Events}, invite::app::Invites, message::app::Messages, @@ -37,8 +37,8 @@ impl App { Boot::new(&self.db) } - pub const fn channels(&self) -> Channels { - Channels::new(&self.db, &self.events) + pub const fn conversations(&self) -> Conversations { + Conversations::new(&self.db, &self.events) } pub const fn events(&self) -> Events { diff --git a/src/boot/app.rs b/src/boot/app.rs index 89eec12..0ed5d1b 100644 --- a/src/boot/app.rs +++ b/src/boot/app.rs @@ -3,7 +3,7 @@ use sqlx::sqlite::SqlitePool; use super::Snapshot; use crate::{ - channel::{self, repo::Provider as _}, + conversation::{self, repo::Provider as _}, event::{Event, Sequence, repo::Provider as _}, message::{self, repo::Provider as _}, name, @@ -24,7 +24,7 @@ impl<'a> Boot<'a> { let resume_point = tx.sequence().current().await?; let users = tx.users().all(resume_point).await?; - let channels = tx.channels().all(resume_point).await?; + let conversations = tx.conversations().all(resume_point).await?; let messages = tx.messages().all(resume_point).await?; tx.commit().await?; @@ -36,9 +36,9 @@ impl<'a> Boot<'a> { .filter(Sequence::up_to(resume_point)) .map(Event::from); - let channel_events = channels + let conversation_events = conversations .iter() - .map(channel::History::events) + .map(conversation::History::events) .kmerge_by(Sequence::merge) .filter(Sequence::up_to(resume_point)) .map(Event::from); @@ -51,7 +51,7 @@ impl<'a> Boot<'a> { .map(Event::from); let events = user_events - .merge_by(channel_events, Sequence::merge) + .merge_by(conversation_events, Sequence::merge) .merge_by(message_events, Sequence::merge) .collect(); @@ -79,9 +79,9 @@ impl From for Error { } } -impl From for Error { - fn from(error: channel::repo::LoadError) -> Self { - use channel::repo::LoadError; +impl From for Error { + fn from(error: conversation::repo::LoadError) -> Self { + use conversation::repo::LoadError; match error { LoadError::Name(error) => error.into(), LoadError::Database(error) => error.into(), diff --git a/src/boot/handlers/boot/test.rs b/src/boot/handlers/boot/test.rs index 1e590a7..c7c511a 100644 --- a/src/boot/handlers/boot/test.rs +++ b/src/boot/handlers/boot/test.rs @@ -37,9 +37,9 @@ async fn includes_users() { } #[tokio::test] -async fn includes_channels() { +async fn includes_conversations() { let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; let viewer = fixtures::identity::fictitious(); let response = super::handler(State(app), viewer) @@ -50,19 +50,19 @@ async fn includes_channels() { .snapshot .events .into_iter() - .filter_map(fixtures::event::channel) - .filter_map(fixtures::event::channel::created) + .filter_map(fixtures::event::conversation) + .filter_map(fixtures::event::conversation::created) .exactly_one() - .expect("only one channel has been created"); - assert_eq!(channel, created.channel); + .expect("only one conversation has been created"); + assert_eq!(conversation, created.conversation); } #[tokio::test] async fn includes_messages() { let app = fixtures::scratch_app().await; let sender = fixtures::user::create(&app, &fixtures::now()).await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; - let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; + let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; + let message = fixtures::message::send(&app, &conversation, &sender, &fixtures::now()).await; let viewer = fixtures::identity::fictitious(); let response = super::handler(State(app), viewer) @@ -84,9 +84,9 @@ async fn includes_messages() { async fn includes_expired_messages() { let app = fixtures::scratch_app().await; let sender = fixtures::user::create(&app, &fixtures::ancient()).await; - let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; + let conversation = fixtures::conversation::create(&app, &fixtures::ancient()).await; let expired_message = - fixtures::message::send(&app, &channel, &sender, &fixtures::ancient()).await; + fixtures::message::send(&app, &conversation, &sender, &fixtures::ancient()).await; app.messages() .expire(&fixtures::now()) @@ -126,8 +126,9 @@ async fn includes_expired_messages() { async fn includes_deleted_messages() { let app = fixtures::scratch_app().await; let sender = fixtures::user::create(&app, &fixtures::now()).await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; - let deleted_message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; + let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; + let deleted_message = + fixtures::message::send(&app, &conversation, &sender, &fixtures::now()).await; app.messages() .delete(&sender, &deleted_message.id, &fixtures::now()) @@ -164,11 +165,11 @@ async fn includes_deleted_messages() { } #[tokio::test] -async fn includes_expired_channels() { +async fn includes_expired_conversations() { let app = fixtures::scratch_app().await; - let expired_channel = fixtures::channel::create(&app, &fixtures::ancient()).await; + let expired_conversation = fixtures::conversation::create(&app, &fixtures::ancient()).await; - app.channels() + app.conversations() .expire(&fixtures::now()) .await .expect("expiry never fails"); @@ -183,34 +184,34 @@ async fn includes_expired_channels() { .events .iter() .cloned() - .filter_map(fixtures::event::channel) - .filter_map(fixtures::event::channel::created) + .filter_map(fixtures::event::conversation) + .filter_map(fixtures::event::conversation::created) .exactly_one() - .expect("only one channel has been created"); - // We don't expect `expired_channel` to match the event exactly, as the name will have been - // tombstoned and the channel given a `deleted_at` date. - assert_eq!(expired_channel.id, created.channel.id); + .expect("only one conversation has been created"); + // We don't expect `expired_conversation` to match the event exactly, as the name will + // have been tombstoned and the conversation given a `deleted_at` date. + assert_eq!(expired_conversation.id, created.conversation.id); let deleted = response .snapshot .events .into_iter() - .filter_map(fixtures::event::channel) - .filter_map(fixtures::event::channel::deleted) + .filter_map(fixtures::event::conversation) + .filter_map(fixtures::event::conversation::deleted) .exactly_one() - .expect("only one channel has expired"); - assert_eq!(expired_channel.id, deleted.id); + .expect("only one conversation has expired"); + assert_eq!(expired_conversation.id, deleted.id); } #[tokio::test] -async fn includes_deleted_channels() { +async fn includes_deleted_conversations() { let app = fixtures::scratch_app().await; - let deleted_channel = fixtures::channel::create(&app, &fixtures::now()).await; + let deleted_conversation = fixtures::conversation::create(&app, &fixtures::now()).await; - app.channels() - .delete(&deleted_channel.id, &fixtures::now()) + app.conversations() + .delete(&deleted_conversation.id, &fixtures::now()) .await - .expect("deleting a valid channel succeeds"); + .expect("deleting a valid conversation succeeds"); let viewer = fixtures::identity::fictitious(); let response = super::handler(State(app), viewer) @@ -222,21 +223,21 @@ async fn includes_deleted_channels() { .events .iter() .cloned() - .filter_map(fixtures::event::channel) - .filter_map(fixtures::event::channel::created) + .filter_map(fixtures::event::conversation) + .filter_map(fixtures::event::conversation::created) .exactly_one() - .expect("only one channel has been created"); - // We don't expect `deleted_channel` to match the event exactly, as the name will have been - // tombstoned and the channel given a `deleted_at` date. - assert_eq!(deleted_channel.id, created.channel.id); + .expect("only one conversation has been created"); + // We don't expect `deleted_conversation` to match the event exactly, as the name will + // have been tombstoned and the conversation given a `deleted_at` date. + assert_eq!(deleted_conversation.id, created.conversation.id); let deleted = response .snapshot .events .into_iter() - .filter_map(fixtures::event::channel) - .filter_map(fixtures::event::channel::deleted) + .filter_map(fixtures::event::conversation) + .filter_map(fixtures::event::conversation::deleted) .exactly_one() - .expect("only one channel has been deleted"); - assert_eq!(deleted_channel.id, deleted.id); + .expect("only one conversation has been deleted"); + assert_eq!(deleted_conversation.id, deleted.id); } diff --git a/src/channel/app.rs b/src/channel/app.rs deleted file mode 100644 index e3b169c..0000000 --- a/src/channel/app.rs +++ /dev/null @@ -1,224 +0,0 @@ -use chrono::TimeDelta; -use itertools::Itertools; -use sqlx::sqlite::SqlitePool; - -use super::{ - Channel, Id, - repo::{LoadError, Provider as _}, - validate, -}; -use crate::{ - clock::DateTime, - db::{Duplicate as _, NotFound as _}, - event::{Broadcaster, Event, Sequence, repo::Provider as _}, - message::{self, repo::Provider as _}, - name::{self, Name}, -}; - -pub struct Channels<'a> { - db: &'a SqlitePool, - events: &'a Broadcaster, -} - -impl<'a> Channels<'a> { - pub const fn new(db: &'a SqlitePool, events: &'a Broadcaster) -> Self { - Self { db, events } - } - - pub async fn create(&self, name: &Name, created_at: &DateTime) -> Result { - if !validate::name(name) { - return Err(CreateError::InvalidName(name.clone())); - } - - let mut tx = self.db.begin().await?; - let created = tx.sequence().next(created_at).await?; - let channel = tx - .channels() - .create(name, &created) - .await - .duplicate(|| CreateError::DuplicateName(name.clone()))?; - tx.commit().await?; - - self.events - .broadcast(channel.events().map(Event::from).collect::>()); - - Ok(channel.as_created()) - } - - // This function is careless with respect to time, and gets you the channel as - // it exists in the specific moment when you call it. - pub async fn get(&self, channel: &Id) -> Result { - let to_not_found = || Error::NotFound(channel.clone()); - let to_deleted = || Error::Deleted(channel.clone()); - - let mut tx = self.db.begin().await?; - let channel = tx.channels().by_id(channel).await.not_found(to_not_found)?; - tx.commit().await?; - - channel.as_snapshot().ok_or_else(to_deleted) - } - - pub async fn delete(&self, channel: &Id, deleted_at: &DateTime) -> Result<(), DeleteError> { - let mut tx = self.db.begin().await?; - - let channel = tx - .channels() - .by_id(channel) - .await - .not_found(|| DeleteError::NotFound(channel.clone()))?; - channel - .as_snapshot() - .ok_or_else(|| DeleteError::Deleted(channel.id().clone()))?; - - let mut events = Vec::new(); - - let messages = tx.messages().live(&channel).await?; - let has_messages = messages - .iter() - .map(message::History::as_snapshot) - .any(|message| message.is_some()); - if has_messages { - return Err(DeleteError::NotEmpty(channel.id().clone())); - } - - let deleted = tx.sequence().next(deleted_at).await?; - let channel = tx.channels().delete(&channel, &deleted).await?; - events.extend( - channel - .events() - .filter(Sequence::start_from(deleted.sequence)) - .map(Event::from), - ); - - tx.commit().await?; - - self.events.broadcast(events); - - Ok(()) - } - - pub async fn expire(&self, relative_to: &DateTime) -> Result<(), ExpireError> { - // Somewhat arbitrarily, expire after 7 days. Active channels will not be - // expired until their messages expire. - let expire_at = relative_to.to_owned() - TimeDelta::days(7); - - let mut tx = self.db.begin().await?; - let expired = tx.channels().expired(&expire_at).await?; - - let mut events = Vec::with_capacity(expired.len()); - for channel in expired { - let deleted = tx.sequence().next(relative_to).await?; - let channel = tx.channels().delete(&channel, &deleted).await?; - events.push( - channel - .events() - .filter(Sequence::start_from(deleted.sequence)), - ); - } - - tx.commit().await?; - - self.events.broadcast( - events - .into_iter() - .kmerge_by(Sequence::merge) - .map(Event::from) - .collect::>(), - ); - - Ok(()) - } - - pub async fn purge(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { - // Somewhat arbitrarily, purge after 6 hours. - let purge_at = relative_to.to_owned() - TimeDelta::hours(6); - - let mut tx = self.db.begin().await?; - tx.channels().purge(&purge_at).await?; - tx.commit().await?; - - Ok(()) - } -} - -#[derive(Debug, thiserror::Error)] -pub enum CreateError { - #[error("channel named {0} already exists")] - DuplicateName(Name), - #[error("invalid channel name: {0}")] - InvalidName(Name), - #[error(transparent)] - Database(#[from] sqlx::Error), - #[error(transparent)] - Name(#[from] name::Error), -} - -impl From for CreateError { - fn from(error: LoadError) -> Self { - match error { - LoadError::Database(error) => error.into(), - LoadError::Name(error) => error.into(), - } - } -} - -#[derive(Debug, thiserror::Error)] -pub enum Error { - #[error("channel {0} not found")] - NotFound(Id), - #[error("channel {0} deleted")] - Deleted(Id), - #[error(transparent)] - Database(#[from] sqlx::Error), - #[error(transparent)] - Name(#[from] name::Error), -} - -impl From for Error { - fn from(error: LoadError) -> Self { - match error { - LoadError::Database(error) => error.into(), - LoadError::Name(error) => error.into(), - } - } -} - -#[derive(Debug, thiserror::Error)] -pub enum DeleteError { - #[error("channel {0} not found")] - NotFound(Id), - #[error("channel {0} deleted")] - Deleted(Id), - #[error("channel {0} not empty")] - NotEmpty(Id), - #[error(transparent)] - Database(#[from] sqlx::Error), - #[error(transparent)] - Name(#[from] name::Error), -} - -impl From for DeleteError { - fn from(error: LoadError) -> Self { - match error { - LoadError::Database(error) => error.into(), - LoadError::Name(error) => error.into(), - } - } -} - -#[derive(Debug, thiserror::Error)] -pub enum ExpireError { - #[error(transparent)] - Database(#[from] sqlx::Error), - #[error(transparent)] - Name(#[from] name::Error), -} - -impl From for ExpireError { - fn from(error: LoadError) -> Self { - match error { - LoadError::Database(error) => error.into(), - LoadError::Name(error) => error.into(), - } - } -} diff --git a/src/channel/event.rs b/src/channel/event.rs deleted file mode 100644 index a5739f9..0000000 --- a/src/channel/event.rs +++ /dev/null @@ -1,46 +0,0 @@ -use super::Channel; -use crate::{ - channel, - event::{Instant, Sequenced}, -}; - -#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] -#[serde(tag = "event", rename_all = "snake_case")] -pub enum Event { - Created(Created), - Deleted(Deleted), -} - -impl Sequenced for Event { - fn instant(&self) -> Instant { - match self { - Self::Created(event) => event.channel.created, - Self::Deleted(event) => event.instant, - } - } -} - -#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] -pub struct Created { - #[serde(flatten)] - pub channel: Channel, -} - -impl From for Event { - fn from(event: Created) -> Self { - Self::Created(event) - } -} - -#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] -pub struct Deleted { - #[serde(flatten)] - pub instant: Instant, - pub id: channel::Id, -} - -impl From for Event { - fn from(event: Deleted) -> Self { - Self::Deleted(event) - } -} diff --git a/src/channel/handlers/create/mod.rs b/src/channel/handlers/create/mod.rs deleted file mode 100644 index 2c860fc..0000000 --- a/src/channel/handlers/create/mod.rs +++ /dev/null @@ -1,67 +0,0 @@ -use axum::{ - extract::{Json, State}, - http::StatusCode, - response::{self, IntoResponse}, -}; - -use crate::{ - app::App, - channel::{Channel, app}, - clock::RequestedAt, - error::Internal, - name::Name, - token::extract::Identity, -}; - -#[cfg(test)] -mod test; - -pub async fn handler( - State(app): State, - _: Identity, // requires auth, but doesn't actually care who you are - RequestedAt(created_at): RequestedAt, - Json(request): Json, -) -> Result { - let channel = app - .channels() - .create(&request.name, &created_at) - .await - .map_err(Error)?; - - Ok(Response(channel)) -} - -#[derive(serde::Deserialize)] -pub struct Request { - pub name: Name, -} - -#[derive(Debug)] -pub struct Response(pub Channel); - -impl IntoResponse for Response { - fn into_response(self) -> response::Response { - let Self(channel) = self; - (StatusCode::ACCEPTED, Json(channel)).into_response() - } -} - -#[derive(Debug)] -pub struct Error(pub app::CreateError); - -impl IntoResponse for Error { - fn into_response(self) -> response::Response { - let Self(error) = self; - match error { - app::CreateError::DuplicateName(_) => { - (StatusCode::CONFLICT, error.to_string()).into_response() - } - app::CreateError::InvalidName(_) => { - (StatusCode::BAD_REQUEST, error.to_string()).into_response() - } - app::CreateError::Name(_) | app::CreateError::Database(_) => { - Internal::from(error).into_response() - } - } - } -} diff --git a/src/channel/handlers/create/test.rs b/src/channel/handlers/create/test.rs deleted file mode 100644 index 31bb778..0000000 --- a/src/channel/handlers/create/test.rs +++ /dev/null @@ -1,250 +0,0 @@ -use std::future; - -use axum::extract::{Json, State}; -use futures::stream::StreamExt as _; -use itertools::Itertools; - -use crate::{ - channel::app, - name::Name, - test::fixtures::{self, future::Expect as _}, -}; - -#[tokio::test] -async fn new_channel() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let creator = fixtures::identity::create(&app, &fixtures::now()).await; - let resume_point = fixtures::boot::resume_point(&app).await; - - // Call the endpoint - - let name = fixtures::channel::propose(); - let request = super::Request { name: name.clone() }; - let super::Response(response) = - super::handler(State(app.clone()), creator, fixtures::now(), Json(request)) - .await - .expect("creating a channel in an empty app succeeds"); - - // Verify the structure of the response - - assert_eq!(name, response.name); - - // Verify the semantics - - let snapshot = app.boot().snapshot().await.expect("boot always succeeds"); - let created = snapshot - .events - .into_iter() - .filter_map(fixtures::event::channel) - .filter_map(fixtures::event::channel::created) - .exactly_one() - .expect("only one channel has been created"); - assert_eq!(response, created.channel); - - let channel = app - .channels() - .get(&response.id) - .await - .expect("the newly-created channel exists"); - assert_eq!(response, channel); - - let mut events = app - .events() - .subscribe(resume_point) - .await - .expect("subscribing never fails") - .filter_map(fixtures::event::stream::channel) - .filter_map(fixtures::event::stream::channel::created) - .filter(|event| future::ready(event.channel == response)); - - let event = events.next().expect_some("creation event published").await; - - assert_eq!(event.channel, response); -} - -#[tokio::test] -async fn duplicate_name() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let creator = fixtures::identity::create(&app, &fixtures::now()).await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; - - // Call the endpoint - - let request = super::Request { - name: channel.name.clone(), - }; - let super::Error(error) = - super::handler(State(app.clone()), creator, fixtures::now(), Json(request)) - .await - .expect_err("duplicate channel name should fail the request"); - - // Verify the structure of the response - - assert!(matches!( - error, - app::CreateError::DuplicateName(name) if channel.name == name - )); -} - -#[tokio::test] -async fn conflicting_canonical_name() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let creator = fixtures::identity::create(&app, &fixtures::now()).await; - - let existing_name = Name::from("rijksmuseum"); - app.channels() - .create(&existing_name, &fixtures::now()) - .await - .expect("creating a channel in an empty environment succeeds"); - - let conflicting_name = Name::from("r\u{0133}ksmuseum"); - - // Call the endpoint - - let request = super::Request { - name: conflicting_name.clone(), - }; - let super::Error(error) = - super::handler(State(app.clone()), creator, fixtures::now(), Json(request)) - .await - .expect_err("duplicate channel name should fail the request"); - - // Verify the structure of the response - - assert!(matches!( - error, - app::CreateError::DuplicateName(name) if conflicting_name == name - )); -} - -#[tokio::test] -async fn invalid_name() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let creator = fixtures::identity::create(&app, &fixtures::now()).await; - - // Call the endpoint - - let name = fixtures::channel::propose_invalid_name(); - let request = super::Request { name: name.clone() }; - let super::Error(error) = crate::channel::handlers::create::handler( - State(app.clone()), - creator, - fixtures::now(), - Json(request), - ) - .await - .expect_err("invalid channel name should fail the request"); - - // Verify the structure of the response - - assert!(matches!( - error, - app::CreateError::InvalidName(error_name) if name == error_name - )); -} - -#[tokio::test] -async fn name_reusable_after_delete() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let creator = fixtures::identity::create(&app, &fixtures::now()).await; - let name = fixtures::channel::propose(); - - // Call the endpoint (first time) - - let request = super::Request { name: name.clone() }; - let super::Response(response) = super::handler( - State(app.clone()), - creator.clone(), - fixtures::now(), - Json(request), - ) - .await - .expect("new channel in an empty app"); - - // Delete the channel - - app.channels() - .delete(&response.id, &fixtures::now()) - .await - .expect("deleting a newly-created channel succeeds"); - - // Call the endpoint (second time) - - let request = super::Request { name: name.clone() }; - let super::Response(response) = - super::handler(State(app.clone()), creator, fixtures::now(), Json(request)) - .await - .expect("creation succeeds after original channel deleted"); - - // Verify the structure of the response - - assert_eq!(name, response.name); - - // Verify the semantics - - let channel = app - .channels() - .get(&response.id) - .await - .expect("the newly-created channel exists"); - assert_eq!(response, channel); -} - -#[tokio::test] -async fn name_reusable_after_expiry() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let creator = fixtures::identity::create(&app, &fixtures::ancient()).await; - let name = fixtures::channel::propose(); - - // Call the endpoint (first time) - - let request = super::Request { name: name.clone() }; - let super::Response(_) = super::handler( - State(app.clone()), - creator.clone(), - fixtures::ancient(), - Json(request), - ) - .await - .expect("new channel in an empty app"); - - // Delete the channel - - app.channels() - .expire(&fixtures::now()) - .await - .expect("expiry always succeeds"); - - // Call the endpoint (second time) - - let request = super::Request { name: name.clone() }; - let super::Response(response) = - super::handler(State(app.clone()), creator, fixtures::now(), Json(request)) - .await - .expect("creation succeeds after original channel expired"); - - // Verify the structure of the response - - assert_eq!(name, response.name); - - // Verify the semantics - - let channel = app - .channels() - .get(&response.id) - .await - .expect("the newly-created channel exists"); - assert_eq!(response, channel); -} diff --git a/src/channel/handlers/delete/mod.rs b/src/channel/handlers/delete/mod.rs deleted file mode 100644 index b986bec..0000000 --- a/src/channel/handlers/delete/mod.rs +++ /dev/null @@ -1,59 +0,0 @@ -use axum::{ - extract::{Json, Path, State}, - http::StatusCode, - response::{self, IntoResponse}, -}; - -use crate::{ - app::App, - channel::{self, app, handlers::PathInfo}, - clock::RequestedAt, - error::{Internal, NotFound}, - token::extract::Identity, -}; - -#[cfg(test)] -mod test; - -pub async fn handler( - State(app): State, - Path(channel): Path, - RequestedAt(deleted_at): RequestedAt, - _: Identity, -) -> Result { - app.channels().delete(&channel, &deleted_at).await?; - - Ok(Response { id: channel }) -} - -#[derive(Debug, serde::Serialize)] -pub struct Response { - pub id: channel::Id, -} - -impl IntoResponse for Response { - fn into_response(self) -> response::Response { - (StatusCode::ACCEPTED, Json(self)).into_response() - } -} - -#[derive(Debug, thiserror::Error)] -#[error(transparent)] -pub struct Error(#[from] pub app::DeleteError); - -impl IntoResponse for Error { - fn into_response(self) -> response::Response { - let Self(error) = self; - match error { - app::DeleteError::NotFound(_) | app::DeleteError::Deleted(_) => { - NotFound(error).into_response() - } - app::DeleteError::NotEmpty(_) => { - (StatusCode::CONFLICT, error.to_string()).into_response() - } - app::DeleteError::Name(_) | app::DeleteError::Database(_) => { - Internal::from(error).into_response() - } - } - } -} diff --git a/src/channel/handlers/delete/test.rs b/src/channel/handlers/delete/test.rs deleted file mode 100644 index 99c19db..0000000 --- a/src/channel/handlers/delete/test.rs +++ /dev/null @@ -1,184 +0,0 @@ -use axum::extract::{Path, State}; -use itertools::Itertools; - -use crate::{channel::app, test::fixtures}; - -#[tokio::test] -pub async fn valid_channel() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; - - // Send the request - - let deleter = fixtures::identity::create(&app, &fixtures::now()).await; - let response = super::handler( - State(app.clone()), - Path(channel.id.clone()), - fixtures::now(), - deleter, - ) - .await - .expect("deleting a valid channel succeeds"); - - // Verify the response - - assert_eq!(channel.id, response.id); - - // Verify the semantics - - let snapshot = app.boot().snapshot().await.expect("boot always succeeds"); - let created = snapshot - .events - .into_iter() - .filter_map(fixtures::event::channel) - .filter_map(fixtures::event::channel::created) - .exactly_one() - .expect("only one channel has been created"); - // We don't expect `channel` to match the event exactly, as the name will have been - // tombstoned and the channel given a `deleted_at` date. - assert_eq!(channel.id, created.channel.id); -} - -#[tokio::test] -pub async fn invalid_channel_id() { - // Set up the environment - - let app = fixtures::scratch_app().await; - - // Send the request - - let deleter = fixtures::identity::create(&app, &fixtures::now()).await; - let channel = fixtures::channel::fictitious(); - let super::Error(error) = super::handler( - State(app.clone()), - Path(channel.clone()), - fixtures::now(), - deleter, - ) - .await - .expect_err("deleting a nonexistent channel fails"); - - // Verify the response - - assert!(matches!(error, app::DeleteError::NotFound(id) if id == channel)); -} - -#[tokio::test] -pub async fn channel_deleted() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; - - app.channels() - .delete(&channel.id, &fixtures::now()) - .await - .expect("deleting a recently-sent channel succeeds"); - - // Send the request - - let deleter = fixtures::identity::create(&app, &fixtures::now()).await; - let super::Error(error) = super::handler( - State(app.clone()), - Path(channel.id.clone()), - fixtures::now(), - deleter, - ) - .await - .expect_err("deleting a deleted channel fails"); - - // Verify the response - - assert!(matches!(error, app::DeleteError::Deleted(id) if id == channel.id)); -} - -#[tokio::test] -pub async fn channel_expired() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; - - app.channels() - .expire(&fixtures::now()) - .await - .expect("expiring channels always succeeds"); - - // Send the request - - let deleter = fixtures::identity::create(&app, &fixtures::now()).await; - let super::Error(error) = super::handler( - State(app.clone()), - Path(channel.id.clone()), - fixtures::now(), - deleter, - ) - .await - .expect_err("deleting an expired channel fails"); - - // Verify the response - - assert!(matches!(error, app::DeleteError::Deleted(id) if id == channel.id)); -} - -#[tokio::test] -pub async fn channel_purged() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; - - app.channels() - .expire(&fixtures::old()) - .await - .expect("expiring channels always succeeds"); - - app.channels() - .purge(&fixtures::now()) - .await - .expect("purging channels always succeeds"); - - // Send the request - - let deleter = fixtures::identity::create(&app, &fixtures::now()).await; - let super::Error(error) = super::handler( - State(app.clone()), - Path(channel.id.clone()), - fixtures::now(), - deleter, - ) - .await - .expect_err("deleting a purged channel fails"); - - // Verify the response - - assert!(matches!(error, app::DeleteError::NotFound(id) if id == channel.id)); -} - -#[tokio::test] -pub async fn channel_not_empty() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; - let sender = fixtures::user::create(&app, &fixtures::now()).await; - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; - - // Send the request - - let deleter = fixtures::identity::create(&app, &fixtures::now()).await; - let super::Error(error) = super::handler( - State(app.clone()), - Path(channel.id.clone()), - fixtures::now(), - deleter, - ) - .await - .expect_err("deleting a channel with messages fails"); - - // Verify the response - - assert!(matches!(error, app::DeleteError::NotEmpty(id) if id == channel.id)); -} diff --git a/src/channel/handlers/mod.rs b/src/channel/handlers/mod.rs deleted file mode 100644 index f2ffd0d..0000000 --- a/src/channel/handlers/mod.rs +++ /dev/null @@ -1,9 +0,0 @@ -mod create; -mod delete; -mod send; - -pub use create::handler as create; -pub use delete::handler as delete; -pub use send::handler as send; - -type PathInfo = crate::channel::Id; diff --git a/src/channel/handlers/send/mod.rs b/src/channel/handlers/send/mod.rs deleted file mode 100644 index bde39e5..0000000 --- a/src/channel/handlers/send/mod.rs +++ /dev/null @@ -1,63 +0,0 @@ -use axum::{ - extract::{Json, Path, State}, - http::StatusCode, - response::{self, IntoResponse}, -}; - -use crate::channel::handlers::PathInfo; -use crate::{ - app::App, - clock::RequestedAt, - error::{Internal, NotFound}, - message::{Body, Message, app::SendError}, - token::extract::Identity, -}; - -#[cfg(test)] -mod test; - -pub async fn handler( - State(app): State, - Path(channel): Path, - RequestedAt(sent_at): RequestedAt, - identity: Identity, - Json(request): Json, -) -> Result { - let message = app - .messages() - .send(&channel, &identity.user, &sent_at, &request.body) - .await?; - - Ok(Response(message)) -} - -#[derive(serde::Deserialize)] -pub struct Request { - pub body: Body, -} - -#[derive(Debug)] -pub struct Response(pub Message); - -impl IntoResponse for Response { - fn into_response(self) -> response::Response { - let Self(message) = self; - (StatusCode::ACCEPTED, Json(message)).into_response() - } -} - -#[derive(Debug, thiserror::Error)] -#[error(transparent)] -pub struct Error(#[from] pub SendError); - -impl IntoResponse for Error { - fn into_response(self) -> response::Response { - let Self(error) = self; - match error { - SendError::ChannelNotFound(_) | SendError::ChannelDeleted(_) => { - NotFound(error).into_response() - } - SendError::Name(_) | SendError::Database(_) => Internal::from(error).into_response(), - } - } -} diff --git a/src/channel/handlers/send/test.rs b/src/channel/handlers/send/test.rs deleted file mode 100644 index 70d45eb..0000000 --- a/src/channel/handlers/send/test.rs +++ /dev/null @@ -1,130 +0,0 @@ -use axum::extract::{Json, Path, State}; -use futures::stream::{self, StreamExt as _}; - -use crate::{ - channel, - event::Sequenced, - message::app::SendError, - test::fixtures::{self, future::Expect as _}, -}; - -#[tokio::test] -async fn messages_in_order() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let sender = fixtures::identity::create(&app, &fixtures::now()).await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; - let resume_point = fixtures::boot::resume_point(&app).await; - - // Call the endpoint (twice) - - let requests = vec![ - (fixtures::now(), fixtures::message::propose()), - (fixtures::now(), fixtures::message::propose()), - ]; - - for (sent_at, body) in &requests { - let request = super::Request { body: body.clone() }; - - let _ = super::handler( - State(app.clone()), - Path(channel.id.clone()), - sent_at.clone(), - sender.clone(), - Json(request), - ) - .await - .expect("sending to a valid channel succeeds"); - } - - // Verify the semantics - - let mut events = app - .events() - .subscribe(resume_point) - .await - .expect("subscribing to a valid channel succeeds") - .filter_map(fixtures::event::stream::message) - .filter_map(fixtures::event::stream::message::sent) - .zip(stream::iter(requests)); - - while let Some((event, (sent_at, body))) = events - .next() - .expect_ready("an event should be ready for each message") - .await - { - assert_eq!(*sent_at, event.at()); - assert_eq!(sender.user.id, event.message.sender); - assert_eq!(body, event.message.body); - } -} - -#[tokio::test] -async fn nonexistent_channel() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let sender = fixtures::identity::create(&app, &fixtures::now()).await; - - // Call the endpoint - - let sent_at = fixtures::now(); - let channel = channel::Id::generate(); - let request = super::Request { - body: fixtures::message::propose(), - }; - let super::Error(error) = super::handler( - State(app), - Path(channel.clone()), - sent_at, - sender, - Json(request), - ) - .await - .expect_err("sending to a nonexistent channel fails"); - - // Verify the structure of the response - - assert!(matches!( - error, - SendError::ChannelNotFound(error_channel) if channel == error_channel - )); -} - -#[tokio::test] -async fn deleted_channel() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let sender = fixtures::identity::create(&app, &fixtures::now()).await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; - - app.channels() - .delete(&channel.id, &fixtures::now()) - .await - .expect("deleting a new channel succeeds"); - - // Call the endpoint - - let sent_at = fixtures::now(); - let request = super::Request { - body: fixtures::message::propose(), - }; - let super::Error(error) = super::handler( - State(app), - Path(channel.id.clone()), - sent_at, - sender, - Json(request), - ) - .await - .expect_err("sending to a deleted channel fails"); - - // Verify the structure of the response - - assert!(matches!( - error, - SendError::ChannelDeleted(error_channel) if channel.id == error_channel - )); -} diff --git a/src/channel/history.rs b/src/channel/history.rs deleted file mode 100644 index 85da5a5..0000000 --- a/src/channel/history.rs +++ /dev/null @@ -1,69 +0,0 @@ -use itertools::Itertools as _; - -use super::{ - Channel, Id, - event::{Created, Deleted, Event}, -}; -use crate::event::{Instant, Sequence}; - -#[derive(Clone, Debug, Eq, PartialEq)] -pub struct History { - pub channel: Channel, - pub deleted: Option, -} - -// State interface -impl History { - pub fn id(&self) -> &Id { - &self.channel.id - } - - // Snapshot of this channel as it was when created. (Note to the future: it's - // okay if this returns a redacted or modified version of the channel. If we - // implement renames by redacting the original name, then this should return the - // renamed channel, not the original, even if that's not how it was "as - // created.") - pub fn as_created(&self) -> Channel { - self.channel.clone() - } - - pub fn as_of(&self, sequence: S) -> Option - where - S: Into, - { - self.events() - .filter(Sequence::up_to(sequence.into())) - .collect() - } - - // Snapshot of this channel as of all events recorded in this history. - pub fn as_snapshot(&self) -> Option { - self.events().collect() - } -} - -// Event factories -impl History { - pub fn events(&self) -> impl Iterator + use<> { - [self.created()] - .into_iter() - .merge_by(self.deleted(), Sequence::merge) - } - - fn created(&self) -> Event { - Created { - channel: self.channel.clone(), - } - .into() - } - - fn deleted(&self) -> Option { - self.deleted.map(|instant| { - Deleted { - instant, - id: self.channel.id.clone(), - } - .into() - }) - } -} diff --git a/src/channel/id.rs b/src/channel/id.rs deleted file mode 100644 index 22a2700..0000000 --- a/src/channel/id.rs +++ /dev/null @@ -1,38 +0,0 @@ -use std::fmt; - -use crate::id::Id as BaseId; - -// Stable identifier for a [Channel]. Prefixed with `C`. -#[derive( - Clone, - Debug, - Eq, - Hash, - Ord, - PartialEq, - PartialOrd, - sqlx::Type, - serde::Deserialize, - serde::Serialize, -)] -#[sqlx(transparent)] -#[serde(transparent)] -pub struct Id(BaseId); - -impl From for Id { - fn from(id: BaseId) -> Self { - Self(id) - } -} - -impl Id { - pub fn generate() -> Self { - BaseId::generate("C") - } -} - -impl fmt::Display for Id { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.0.fmt(f) - } -} diff --git a/src/channel/mod.rs b/src/channel/mod.rs deleted file mode 100644 index bbaf33e..0000000 --- a/src/channel/mod.rs +++ /dev/null @@ -1,10 +0,0 @@ -pub mod app; -pub mod event; -pub mod handlers; -mod history; -mod id; -pub mod repo; -mod snapshot; -mod validate; - -pub use self::{event::Event, history::History, id::Id, snapshot::Channel}; diff --git a/src/channel/repo.rs b/src/channel/repo.rs deleted file mode 100644 index fd2173a..0000000 --- a/src/channel/repo.rs +++ /dev/null @@ -1,336 +0,0 @@ -use futures::stream::{StreamExt as _, TryStreamExt as _}; -use sqlx::{SqliteConnection, Transaction, sqlite::Sqlite}; - -use crate::{ - channel::{Channel, History, Id}, - clock::DateTime, - db::NotFound, - event::{Instant, Sequence}, - name::{self, Name}, -}; - -pub trait Provider { - fn channels(&mut self) -> Channels; -} - -impl Provider for Transaction<'_, Sqlite> { - fn channels(&mut self) -> Channels { - Channels(self) - } -} - -pub struct Channels<'t>(&'t mut SqliteConnection); - -impl Channels<'_> { - pub async fn create(&mut self, name: &Name, created: &Instant) -> Result { - let id = Id::generate(); - let name = name.clone(); - let display_name = name.display(); - let canonical_name = name.canonical(); - let created = *created; - - sqlx::query!( - r#" - insert into conversation (id, created_at, created_sequence, last_sequence) - values ($1, $2, $3, $4) - "#, - id, - created.at, - created.sequence, - created.sequence, - ) - .execute(&mut *self.0) - .await?; - - sqlx::query!( - r#" - insert into conversation_name (id, display_name, canonical_name) - values ($1, $2, $3) - "#, - id, - display_name, - canonical_name, - ) - .execute(&mut *self.0) - .await?; - - let channel = History { - channel: Channel { - created, - id, - name: name.clone(), - deleted_at: None, - }, - deleted: None, - }; - - Ok(channel) - } - - pub async fn by_id(&mut self, channel: &Id) -> Result { - let channel = sqlx::query!( - r#" - select - id as "id: Id", - name.display_name as "display_name?: String", - name.canonical_name as "canonical_name?: String", - conversation.created_at as "created_at: DateTime", - conversation.created_sequence as "created_sequence: Sequence", - deleted.deleted_at as "deleted_at?: DateTime", - deleted.deleted_sequence as "deleted_sequence?: Sequence" - from conversation - left join conversation_name as name - using (id) - left join conversation_deleted as deleted - using (id) - where id = $1 - "#, - channel, - ) - .map(|row| { - Ok::<_, name::Error>(History { - channel: Channel { - created: Instant::new(row.created_at, row.created_sequence), - id: row.id, - name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(), - deleted_at: row.deleted_at, - }, - deleted: Instant::optional(row.deleted_at, row.deleted_sequence), - }) - }) - .fetch_one(&mut *self.0) - .await??; - - Ok(channel) - } - - pub async fn all(&mut self, resume_at: Sequence) -> Result, LoadError> { - let channels = sqlx::query!( - r#" - select - id as "id: Id", - name.display_name as "display_name?: String", - name.canonical_name as "canonical_name?: String", - conversation.created_at as "created_at: DateTime", - conversation.created_sequence as "created_sequence: Sequence", - deleted.deleted_at as "deleted_at?: DateTime", - deleted.deleted_sequence as "deleted_sequence?: Sequence" - from conversation - left join conversation_name as name - using (id) - left join conversation_deleted as deleted - using (id) - where conversation.created_sequence <= $1 - order by name.canonical_name - "#, - resume_at, - ) - .map(|row| { - Ok::<_, name::Error>(History { - channel: Channel { - created: Instant::new(row.created_at, row.created_sequence), - id: row.id, - name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(), - deleted_at: row.deleted_at, - }, - deleted: Instant::optional(row.deleted_at, row.deleted_sequence), - }) - }) - .fetch(&mut *self.0) - .map(|res| Ok::<_, LoadError>(res??)) - .try_collect() - .await?; - - Ok(channels) - } - - pub async fn replay(&mut self, resume_at: Sequence) -> Result, LoadError> { - let channels = sqlx::query!( - r#" - select - id as "id: Id", - name.display_name as "display_name?: String", - name.canonical_name as "canonical_name?: String", - conversation.created_at as "created_at: DateTime", - conversation.created_sequence as "created_sequence: Sequence", - deleted.deleted_at as "deleted_at?: DateTime", - deleted.deleted_sequence as "deleted_sequence?: Sequence" - from conversation - left join conversation_name as name - using (id) - left join conversation_deleted as deleted - using (id) - where conversation.last_sequence > $1 - "#, - resume_at, - ) - .map(|row| { - Ok::<_, name::Error>(History { - channel: Channel { - created: Instant::new(row.created_at, row.created_sequence), - id: row.id, - name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(), - deleted_at: row.deleted_at, - }, - deleted: Instant::optional(row.deleted_at, row.deleted_sequence), - }) - }) - .fetch(&mut *self.0) - .map(|res| Ok::<_, LoadError>(res??)) - .try_collect() - .await?; - - Ok(channels) - } - - pub async fn delete( - &mut self, - channel: &History, - deleted: &Instant, - ) -> Result { - let id = channel.id(); - sqlx::query!( - r#" - update conversation - set last_sequence = max(last_sequence, $1) - where id = $2 - returning id as "id: Id" - "#, - deleted.sequence, - id, - ) - .fetch_one(&mut *self.0) - .await?; - - sqlx::query!( - r#" - insert into conversation_deleted (id, deleted_at, deleted_sequence) - values ($1, $2, $3) - "#, - id, - deleted.at, - deleted.sequence, - ) - .execute(&mut *self.0) - .await?; - - // Small social responsibility hack here: when a conversation is deleted, its - // name is retconned to have been the empty string. Someone reading the event - // stream afterwards, or looking at channels via the API, cannot retrieve the - // "deleted" channel's information by ignoring the deletion event. - // - // This also avoids the need for a separate name reservation table to ensure - // that live channels have unique names, since the `channel` table's name field - // is unique over non-null values. - sqlx::query!( - r#" - delete from conversation_name - where id = $1 - "#, - id, - ) - .execute(&mut *self.0) - .await?; - - let channel = self.by_id(id).await?; - - Ok(channel) - } - - pub async fn purge(&mut self, purge_at: &DateTime) -> Result<(), sqlx::Error> { - let channels = sqlx::query_scalar!( - r#" - with has_messages as ( - select conversation - from message - group by conversation - ) - delete from conversation_deleted - where deleted_at < $1 - and id not in has_messages - returning id as "id: Id" - "#, - purge_at, - ) - .fetch_all(&mut *self.0) - .await?; - - for channel in channels { - // Wanted: a way to batch these up into one query. - sqlx::query!( - r#" - delete from conversation - where id = $1 - "#, - channel, - ) - .execute(&mut *self.0) - .await?; - } - - Ok(()) - } - - pub async fn expired(&mut self, expired_at: &DateTime) -> Result, LoadError> { - let channels = sqlx::query!( - r#" - select - conversation.id as "id: Id", - name.display_name as "display_name?: String", - name.canonical_name as "canonical_name?: String", - conversation.created_at as "created_at: DateTime", - conversation.created_sequence as "created_sequence: Sequence", - deleted.deleted_at as "deleted_at?: DateTime", - deleted.deleted_sequence as "deleted_sequence?: Sequence" - from conversation - left join conversation_name as name - using (id) - left join conversation_deleted as deleted - using (id) - left join message - on conversation.id = message.conversation - where conversation.created_at < $1 - and message.id is null - and deleted.id is null - "#, - expired_at, - ) - .map(|row| { - Ok::<_, name::Error>(History { - channel: Channel { - created: Instant::new(row.created_at, row.created_sequence), - id: row.id, - name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(), - deleted_at: row.deleted_at, - }, - deleted: Instant::optional(row.deleted_at, row.deleted_sequence), - }) - }) - .fetch(&mut *self.0) - .map(|res| Ok::<_, LoadError>(res??)) - .try_collect() - .await?; - - Ok(channels) - } -} - -#[derive(Debug, thiserror::Error)] -#[error(transparent)] -pub enum LoadError { - Database(#[from] sqlx::Error), - Name(#[from] name::Error), -} - -impl NotFound for Result { - type Ok = T; - type Error = LoadError; - - fn optional(self) -> Result, LoadError> { - match self { - Ok(value) => Ok(Some(value)), - Err(LoadError::Database(sqlx::Error::RowNotFound)) => Ok(None), - Err(other) => Err(other), - } - } -} diff --git a/src/channel/snapshot.rs b/src/channel/snapshot.rs deleted file mode 100644 index 96801b8..0000000 --- a/src/channel/snapshot.rs +++ /dev/null @@ -1,43 +0,0 @@ -use super::{ - Id, - event::{Created, Event}, -}; -use crate::{clock::DateTime, event::Instant, name::Name}; - -#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] -pub struct Channel { - #[serde(flatten)] - pub created: Instant, - pub id: Id, - pub name: Name, - #[serde(skip_serializing_if = "Option::is_none")] - pub deleted_at: Option, -} - -impl Channel { - fn apply(state: Option, event: Event) -> Option { - match (state, event) { - (None, Event::Created(event)) => Some(event.into()), - (Some(channel), Event::Deleted(event)) if channel.id == event.id => None, - (state, event) => panic!("invalid channel event {event:#?} for state {state:#?}"), - } - } -} - -impl FromIterator for Option { - fn from_iter>(events: I) -> Self { - events.into_iter().fold(None, Channel::apply) - } -} - -impl From<&Created> for Channel { - fn from(event: &Created) -> Self { - event.channel.clone() - } -} - -impl From for Channel { - fn from(event: Created) -> Self { - event.channel - } -} diff --git a/src/channel/validate.rs b/src/channel/validate.rs deleted file mode 100644 index 7894e0c..0000000 --- a/src/channel/validate.rs +++ /dev/null @@ -1,25 +0,0 @@ -use std::ops::Not as _; - -use unicode_segmentation::UnicodeSegmentation as _; - -use crate::name::Name; - -// Picked out of a hat. The power of two is not meaningful. -const NAME_TOO_LONG: usize = 64; - -pub fn name(name: &Name) -> bool { - let display = name.display(); - - [ - display.graphemes(true).count() < NAME_TOO_LONG, - display.chars().any(char::is_control).not(), - display.chars().next().is_some_and(|c| !c.is_whitespace()), - display.chars().last().is_some_and(|c| !c.is_whitespace()), - display - .chars() - .zip(display.chars().skip(1)) - .all(|(a, b)| !(a.is_whitespace() && b.is_whitespace())), - ] - .into_iter() - .all(|value| value) -} diff --git a/src/conversation/app.rs b/src/conversation/app.rs new file mode 100644 index 0000000..81ccdcf --- /dev/null +++ b/src/conversation/app.rs @@ -0,0 +1,236 @@ +use chrono::TimeDelta; +use itertools::Itertools; +use sqlx::sqlite::SqlitePool; + +use super::{ + Conversation, Id, + repo::{LoadError, Provider as _}, + validate, +}; +use crate::{ + clock::DateTime, + db::{Duplicate as _, NotFound as _}, + event::{Broadcaster, Event, Sequence, repo::Provider as _}, + message::{self, repo::Provider as _}, + name::{self, Name}, +}; + +pub struct Conversations<'a> { + db: &'a SqlitePool, + events: &'a Broadcaster, +} + +impl<'a> Conversations<'a> { + pub const fn new(db: &'a SqlitePool, events: &'a Broadcaster) -> Self { + Self { db, events } + } + + pub async fn create( + &self, + name: &Name, + created_at: &DateTime, + ) -> Result { + if !validate::name(name) { + return Err(CreateError::InvalidName(name.clone())); + } + + let mut tx = self.db.begin().await?; + let created = tx.sequence().next(created_at).await?; + let conversation = tx + .conversations() + .create(name, &created) + .await + .duplicate(|| CreateError::DuplicateName(name.clone()))?; + tx.commit().await?; + + self.events + .broadcast(conversation.events().map(Event::from).collect::>()); + + Ok(conversation.as_created()) + } + + // This function is careless with respect to time, and gets you the + // conversation as it exists in the specific moment when you call it. + pub async fn get(&self, conversation: &Id) -> Result { + let to_not_found = || Error::NotFound(conversation.clone()); + let to_deleted = || Error::Deleted(conversation.clone()); + + let mut tx = self.db.begin().await?; + let conversation = tx + .conversations() + .by_id(conversation) + .await + .not_found(to_not_found)?; + tx.commit().await?; + + conversation.as_snapshot().ok_or_else(to_deleted) + } + + pub async fn delete( + &self, + conversation: &Id, + deleted_at: &DateTime, + ) -> Result<(), DeleteError> { + let mut tx = self.db.begin().await?; + + let conversation = tx + .conversations() + .by_id(conversation) + .await + .not_found(|| DeleteError::NotFound(conversation.clone()))?; + conversation + .as_snapshot() + .ok_or_else(|| DeleteError::Deleted(conversation.id().clone()))?; + + let mut events = Vec::new(); + + let messages = tx.messages().live(&conversation).await?; + let has_messages = messages + .iter() + .map(message::History::as_snapshot) + .any(|message| message.is_some()); + if has_messages { + return Err(DeleteError::NotEmpty(conversation.id().clone())); + } + + let deleted = tx.sequence().next(deleted_at).await?; + let conversation = tx.conversations().delete(&conversation, &deleted).await?; + events.extend( + conversation + .events() + .filter(Sequence::start_from(deleted.sequence)) + .map(Event::from), + ); + + tx.commit().await?; + + self.events.broadcast(events); + + Ok(()) + } + + pub async fn expire(&self, relative_to: &DateTime) -> Result<(), ExpireError> { + // Somewhat arbitrarily, expire after 7 days. Active conversation will not be + // expired until their messages expire. + let expire_at = relative_to.to_owned() - TimeDelta::days(7); + + let mut tx = self.db.begin().await?; + let expired = tx.conversations().expired(&expire_at).await?; + + let mut events = Vec::with_capacity(expired.len()); + for conversation in expired { + let deleted = tx.sequence().next(relative_to).await?; + let conversation = tx.conversations().delete(&conversation, &deleted).await?; + events.push( + conversation + .events() + .filter(Sequence::start_from(deleted.sequence)), + ); + } + + tx.commit().await?; + + self.events.broadcast( + events + .into_iter() + .kmerge_by(Sequence::merge) + .map(Event::from) + .collect::>(), + ); + + Ok(()) + } + + pub async fn purge(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { + // Somewhat arbitrarily, purge after 6 hours. + let purge_at = relative_to.to_owned() - TimeDelta::hours(6); + + let mut tx = self.db.begin().await?; + tx.conversations().purge(&purge_at).await?; + tx.commit().await?; + + Ok(()) + } +} + +#[derive(Debug, thiserror::Error)] +pub enum CreateError { + #[error("conversation named {0} already exists")] + DuplicateName(Name), + #[error("invalid conversation name: {0}")] + InvalidName(Name), + #[error(transparent)] + Database(#[from] sqlx::Error), + #[error(transparent)] + Name(#[from] name::Error), +} + +impl From for CreateError { + fn from(error: LoadError) -> Self { + match error { + LoadError::Database(error) => error.into(), + LoadError::Name(error) => error.into(), + } + } +} + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("conversation {0} not found")] + NotFound(Id), + #[error("conversation {0} deleted")] + Deleted(Id), + #[error(transparent)] + Database(#[from] sqlx::Error), + #[error(transparent)] + Name(#[from] name::Error), +} + +impl From for Error { + fn from(error: LoadError) -> Self { + match error { + LoadError::Database(error) => error.into(), + LoadError::Name(error) => error.into(), + } + } +} + +#[derive(Debug, thiserror::Error)] +pub enum DeleteError { + #[error("conversation {0} not found")] + NotFound(Id), + #[error("conversation {0} deleted")] + Deleted(Id), + #[error("conversation {0} not empty")] + NotEmpty(Id), + #[error(transparent)] + Database(#[from] sqlx::Error), + #[error(transparent)] + Name(#[from] name::Error), +} + +impl From for DeleteError { + fn from(error: LoadError) -> Self { + match error { + LoadError::Database(error) => error.into(), + LoadError::Name(error) => error.into(), + } + } +} + +#[derive(Debug, thiserror::Error)] +pub enum ExpireError { + #[error(transparent)] + Database(#[from] sqlx::Error), + #[error(transparent)] + Name(#[from] name::Error), +} + +impl From for ExpireError { + fn from(error: LoadError) -> Self { + match error { + LoadError::Database(error) => error.into(), + LoadError::Name(error) => error.into(), + } + } +} diff --git a/src/conversation/event.rs b/src/conversation/event.rs new file mode 100644 index 0000000..f5e8a81 --- /dev/null +++ b/src/conversation/event.rs @@ -0,0 +1,46 @@ +use super::Conversation; +use crate::{ + conversation, + event::{Instant, Sequenced}, +}; + +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] +#[serde(tag = "event", rename_all = "snake_case")] +pub enum Event { + Created(Created), + Deleted(Deleted), +} + +impl Sequenced for Event { + fn instant(&self) -> Instant { + match self { + Self::Created(event) => event.conversation.created, + Self::Deleted(event) => event.instant, + } + } +} + +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] +pub struct Created { + #[serde(flatten)] + pub conversation: Conversation, +} + +impl From for Event { + fn from(event: Created) -> Self { + Self::Created(event) + } +} + +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] +pub struct Deleted { + #[serde(flatten)] + pub instant: Instant, + pub id: conversation::Id, +} + +impl From for Event { + fn from(event: Deleted) -> Self { + Self::Deleted(event) + } +} diff --git a/src/conversation/handlers/create/mod.rs b/src/conversation/handlers/create/mod.rs new file mode 100644 index 0000000..18eca1f --- /dev/null +++ b/src/conversation/handlers/create/mod.rs @@ -0,0 +1,67 @@ +use axum::{ + extract::{Json, State}, + http::StatusCode, + response::{self, IntoResponse}, +}; + +use crate::{ + app::App, + clock::RequestedAt, + conversation::{Conversation, app}, + error::Internal, + name::Name, + token::extract::Identity, +}; + +#[cfg(test)] +mod test; + +pub async fn handler( + State(app): State, + _: Identity, // requires auth, but doesn't actually care who you are + RequestedAt(created_at): RequestedAt, + Json(request): Json, +) -> Result { + let conversation = app + .conversations() + .create(&request.name, &created_at) + .await + .map_err(Error)?; + + Ok(Response(conversation)) +} + +#[derive(serde::Deserialize)] +pub struct Request { + pub name: Name, +} + +#[derive(Debug)] +pub struct Response(pub Conversation); + +impl IntoResponse for Response { + fn into_response(self) -> response::Response { + let Self(conversation) = self; + (StatusCode::ACCEPTED, Json(conversation)).into_response() + } +} + +#[derive(Debug)] +pub struct Error(pub app::CreateError); + +impl IntoResponse for Error { + fn into_response(self) -> response::Response { + let Self(error) = self; + match error { + app::CreateError::DuplicateName(_) => { + (StatusCode::CONFLICT, error.to_string()).into_response() + } + app::CreateError::InvalidName(_) => { + (StatusCode::BAD_REQUEST, error.to_string()).into_response() + } + app::CreateError::Name(_) | app::CreateError::Database(_) => { + Internal::from(error).into_response() + } + } + } +} diff --git a/src/conversation/handlers/create/test.rs b/src/conversation/handlers/create/test.rs new file mode 100644 index 0000000..bc05b00 --- /dev/null +++ b/src/conversation/handlers/create/test.rs @@ -0,0 +1,250 @@ +use std::future; + +use axum::extract::{Json, State}; +use futures::stream::StreamExt as _; +use itertools::Itertools; + +use crate::{ + conversation::app, + name::Name, + test::fixtures::{self, future::Expect as _}, +}; + +#[tokio::test] +async fn new_conversation() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let creator = fixtures::identity::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; + + // Call the endpoint + + let name = fixtures::conversation::propose(); + let request = super::Request { name: name.clone() }; + let super::Response(response) = + super::handler(State(app.clone()), creator, fixtures::now(), Json(request)) + .await + .expect("creating a conversation in an empty app succeeds"); + + // Verify the structure of the response + + assert_eq!(name, response.name); + + // Verify the semantics + + let snapshot = app.boot().snapshot().await.expect("boot always succeeds"); + let created = snapshot + .events + .into_iter() + .filter_map(fixtures::event::conversation) + .filter_map(fixtures::event::conversation::created) + .exactly_one() + .expect("only one conversation has been created"); + assert_eq!(response, created.conversation); + + let conversation = app + .conversations() + .get(&response.id) + .await + .expect("the newly-created conversation exists"); + assert_eq!(response, conversation); + + let mut events = app + .events() + .subscribe(resume_point) + .await + .expect("subscribing never fails") + .filter_map(fixtures::event::stream::conversation) + .filter_map(fixtures::event::stream::conversation::created) + .filter(|event| future::ready(event.conversation == response)); + + let event = events.next().expect_some("creation event published").await; + + assert_eq!(event.conversation, response); +} + +#[tokio::test] +async fn duplicate_name() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let creator = fixtures::identity::create(&app, &fixtures::now()).await; + let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; + + // Call the endpoint + + let request = super::Request { + name: conversation.name.clone(), + }; + let super::Error(error) = + super::handler(State(app.clone()), creator, fixtures::now(), Json(request)) + .await + .expect_err("duplicate conversation name should fail the request"); + + // Verify the structure of the response + + assert!(matches!( + error, + app::CreateError::DuplicateName(name) if conversation.name == name + )); +} + +#[tokio::test] +async fn conflicting_canonical_name() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let creator = fixtures::identity::create(&app, &fixtures::now()).await; + + let existing_name = Name::from("rijksmuseum"); + app.conversations() + .create(&existing_name, &fixtures::now()) + .await + .expect("creating a conversation in an empty environment succeeds"); + + let conflicting_name = Name::from("r\u{0133}ksmuseum"); + + // Call the endpoint + + let request = super::Request { + name: conflicting_name.clone(), + }; + let super::Error(error) = + super::handler(State(app.clone()), creator, fixtures::now(), Json(request)) + .await + .expect_err("duplicate conversation name should fail the request"); + + // Verify the structure of the response + + assert!(matches!( + error, + app::CreateError::DuplicateName(name) if conflicting_name == name + )); +} + +#[tokio::test] +async fn invalid_name() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let creator = fixtures::identity::create(&app, &fixtures::now()).await; + + // Call the endpoint + + let name = fixtures::conversation::propose_invalid_name(); + let request = super::Request { name: name.clone() }; + let super::Error(error) = crate::conversation::handlers::create::handler( + State(app.clone()), + creator, + fixtures::now(), + Json(request), + ) + .await + .expect_err("invalid conversation name should fail the request"); + + // Verify the structure of the response + + assert!(matches!( + error, + app::CreateError::InvalidName(error_name) if name == error_name + )); +} + +#[tokio::test] +async fn name_reusable_after_delete() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let creator = fixtures::identity::create(&app, &fixtures::now()).await; + let name = fixtures::conversation::propose(); + + // Call the endpoint (first time) + + let request = super::Request { name: name.clone() }; + let super::Response(response) = super::handler( + State(app.clone()), + creator.clone(), + fixtures::now(), + Json(request), + ) + .await + .expect("new conversation in an empty app"); + + // Delete the conversation + + app.conversations() + .delete(&response.id, &fixtures::now()) + .await + .expect("deleting a newly-created conversation succeeds"); + + // Call the endpoint (second time) + + let request = super::Request { name: name.clone() }; + let super::Response(response) = + super::handler(State(app.clone()), creator, fixtures::now(), Json(request)) + .await + .expect("creation succeeds after original conversation deleted"); + + // Verify the structure of the response + + assert_eq!(name, response.name); + + // Verify the semantics + + let conversation = app + .conversations() + .get(&response.id) + .await + .expect("the newly-created conversation exists"); + assert_eq!(response, conversation); +} + +#[tokio::test] +async fn name_reusable_after_expiry() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let creator = fixtures::identity::create(&app, &fixtures::ancient()).await; + let name = fixtures::conversation::propose(); + + // Call the endpoint (first time) + + let request = super::Request { name: name.clone() }; + let super::Response(_) = super::handler( + State(app.clone()), + creator.clone(), + fixtures::ancient(), + Json(request), + ) + .await + .expect("new conversation in an empty app"); + + // Expire the conversation + + app.conversations() + .expire(&fixtures::now()) + .await + .expect("expiry always succeeds"); + + // Call the endpoint (second time) + + let request = super::Request { name: name.clone() }; + let super::Response(response) = + super::handler(State(app.clone()), creator, fixtures::now(), Json(request)) + .await + .expect("creation succeeds after original conversation expired"); + + // Verify the structure of the response + + assert_eq!(name, response.name); + + // Verify the semantics + + let conversation = app + .conversations() + .get(&response.id) + .await + .expect("the newly-created conversation exists"); + assert_eq!(response, conversation); +} diff --git a/src/conversation/handlers/delete/mod.rs b/src/conversation/handlers/delete/mod.rs new file mode 100644 index 0000000..272165a --- /dev/null +++ b/src/conversation/handlers/delete/mod.rs @@ -0,0 +1,61 @@ +use axum::{ + extract::{Json, Path, State}, + http::StatusCode, + response::{self, IntoResponse}, +}; + +use crate::{ + app::App, + clock::RequestedAt, + conversation::{self, app, handlers::PathInfo}, + error::{Internal, NotFound}, + token::extract::Identity, +}; + +#[cfg(test)] +mod test; + +pub async fn handler( + State(app): State, + Path(conversation): Path, + RequestedAt(deleted_at): RequestedAt, + _: Identity, +) -> Result { + app.conversations() + .delete(&conversation, &deleted_at) + .await?; + + Ok(Response { id: conversation }) +} + +#[derive(Debug, serde::Serialize)] +pub struct Response { + pub id: conversation::Id, +} + +impl IntoResponse for Response { + fn into_response(self) -> response::Response { + (StatusCode::ACCEPTED, Json(self)).into_response() + } +} + +#[derive(Debug, thiserror::Error)] +#[error(transparent)] +pub struct Error(#[from] pub app::DeleteError); + +impl IntoResponse for Error { + fn into_response(self) -> response::Response { + let Self(error) = self; + match error { + app::DeleteError::NotFound(_) | app::DeleteError::Deleted(_) => { + NotFound(error).into_response() + } + app::DeleteError::NotEmpty(_) => { + (StatusCode::CONFLICT, error.to_string()).into_response() + } + app::DeleteError::Name(_) | app::DeleteError::Database(_) => { + Internal::from(error).into_response() + } + } + } +} diff --git a/src/conversation/handlers/delete/test.rs b/src/conversation/handlers/delete/test.rs new file mode 100644 index 0000000..2718d3b --- /dev/null +++ b/src/conversation/handlers/delete/test.rs @@ -0,0 +1,184 @@ +use axum::extract::{Path, State}; +use itertools::Itertools; + +use crate::{conversation::app, test::fixtures}; + +#[tokio::test] +pub async fn valid_conversation() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; + + // Send the request + + let deleter = fixtures::identity::create(&app, &fixtures::now()).await; + let response = super::handler( + State(app.clone()), + Path(conversation.id.clone()), + fixtures::now(), + deleter, + ) + .await + .expect("deleting a valid conversation succeeds"); + + // Verify the response + + assert_eq!(conversation.id, response.id); + + // Verify the semantics + + let snapshot = app.boot().snapshot().await.expect("boot always succeeds"); + let created = snapshot + .events + .into_iter() + .filter_map(fixtures::event::conversation) + .filter_map(fixtures::event::conversation::created) + .exactly_one() + .expect("only one conversation has been created"); + // We don't expect `conversation` to match the event exactly, as the name will have + // been tombstoned and the conversation given a `deleted_at` date. + assert_eq!(conversation.id, created.conversation.id); +} + +#[tokio::test] +pub async fn invalid_conversation_id() { + // Set up the environment + + let app = fixtures::scratch_app().await; + + // Send the request + + let deleter = fixtures::identity::create(&app, &fixtures::now()).await; + let conversation = fixtures::conversation::fictitious(); + let super::Error(error) = super::handler( + State(app.clone()), + Path(conversation.clone()), + fixtures::now(), + deleter, + ) + .await + .expect_err("deleting a nonexistent conversation fails"); + + // Verify the response + + assert!(matches!(error, app::DeleteError::NotFound(id) if id == conversation)); +} + +#[tokio::test] +pub async fn conversation_deleted() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; + + app.conversations() + .delete(&conversation.id, &fixtures::now()) + .await + .expect("deleting a recently-created conversation succeeds"); + + // Send the request + + let deleter = fixtures::identity::create(&app, &fixtures::now()).await; + let super::Error(error) = super::handler( + State(app.clone()), + Path(conversation.id.clone()), + fixtures::now(), + deleter, + ) + .await + .expect_err("deleting a deleted conversation fails"); + + // Verify the response + + assert!(matches!(error, app::DeleteError::Deleted(id) if id == conversation.id)); +} + +#[tokio::test] +pub async fn conversation_expired() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let conversation = fixtures::conversation::create(&app, &fixtures::ancient()).await; + + app.conversations() + .expire(&fixtures::now()) + .await + .expect("expiring conversations always succeeds"); + + // Send the request + + let deleter = fixtures::identity::create(&app, &fixtures::now()).await; + let super::Error(error) = super::handler( + State(app.clone()), + Path(conversation.id.clone()), + fixtures::now(), + deleter, + ) + .await + .expect_err("deleting an expired conversation fails"); + + // Verify the response + + assert!(matches!(error, app::DeleteError::Deleted(id) if id == conversation.id)); +} + +#[tokio::test] +pub async fn conversation_purged() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let conversation = fixtures::conversation::create(&app, &fixtures::ancient()).await; + + app.conversations() + .expire(&fixtures::old()) + .await + .expect("expiring conversations always succeeds"); + + app.conversations() + .purge(&fixtures::now()) + .await + .expect("purging conversations always succeeds"); + + // Send the request + + let deleter = fixtures::identity::create(&app, &fixtures::now()).await; + let super::Error(error) = super::handler( + State(app.clone()), + Path(conversation.id.clone()), + fixtures::now(), + deleter, + ) + .await + .expect_err("deleting a purged conversation fails"); + + // Verify the response + + assert!(matches!(error, app::DeleteError::NotFound(id) if id == conversation.id)); +} + +#[tokio::test] +pub async fn conversation_not_empty() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; + let sender = fixtures::user::create(&app, &fixtures::now()).await; + fixtures::message::send(&app, &conversation, &sender, &fixtures::now()).await; + + // Send the request + + let deleter = fixtures::identity::create(&app, &fixtures::now()).await; + let super::Error(error) = super::handler( + State(app.clone()), + Path(conversation.id.clone()), + fixtures::now(), + deleter, + ) + .await + .expect_err("deleting a conversation with messages fails"); + + // Verify the response + + assert!(matches!(error, app::DeleteError::NotEmpty(id) if id == conversation.id)); +} diff --git a/src/conversation/handlers/mod.rs b/src/conversation/handlers/mod.rs new file mode 100644 index 0000000..2fe727c --- /dev/null +++ b/src/conversation/handlers/mod.rs @@ -0,0 +1,9 @@ +mod create; +mod delete; +mod send; + +pub use create::handler as create; +pub use delete::handler as delete; +pub use send::handler as send; + +type PathInfo = crate::conversation::Id; diff --git a/src/conversation/handlers/send/mod.rs b/src/conversation/handlers/send/mod.rs new file mode 100644 index 0000000..9ec020a --- /dev/null +++ b/src/conversation/handlers/send/mod.rs @@ -0,0 +1,63 @@ +use axum::{ + extract::{Json, Path, State}, + http::StatusCode, + response::{self, IntoResponse}, +}; + +use crate::conversation::handlers::PathInfo; +use crate::{ + app::App, + clock::RequestedAt, + error::{Internal, NotFound}, + message::{Body, Message, app::SendError}, + token::extract::Identity, +}; + +#[cfg(test)] +mod test; + +pub async fn handler( + State(app): State, + Path(conversation): Path, + RequestedAt(sent_at): RequestedAt, + identity: Identity, + Json(request): Json, +) -> Result { + let message = app + .messages() + .send(&conversation, &identity.user, &sent_at, &request.body) + .await?; + + Ok(Response(message)) +} + +#[derive(serde::Deserialize)] +pub struct Request { + pub body: Body, +} + +#[derive(Debug)] +pub struct Response(pub Message); + +impl IntoResponse for Response { + fn into_response(self) -> response::Response { + let Self(message) = self; + (StatusCode::ACCEPTED, Json(message)).into_response() + } +} + +#[derive(Debug, thiserror::Error)] +#[error(transparent)] +pub struct Error(#[from] pub SendError); + +impl IntoResponse for Error { + fn into_response(self) -> response::Response { + let Self(error) = self; + match error { + SendError::ConversationNotFound(_) | SendError::ConversationDeleted(_) => { + NotFound(error).into_response() + } + SendError::Name(_) | SendError::Database(_) => Internal::from(error).into_response(), + } + } +} diff --git a/src/conversation/handlers/send/test.rs b/src/conversation/handlers/send/test.rs new file mode 100644 index 0000000..bd32510 --- /dev/null +++ b/src/conversation/handlers/send/test.rs @@ -0,0 +1,130 @@ +use axum::extract::{Json, Path, State}; +use futures::stream::{self, StreamExt as _}; + +use crate::{ + conversation, + event::Sequenced, + message::app::SendError, + test::fixtures::{self, future::Expect as _}, +}; + +#[tokio::test] +async fn messages_in_order() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let sender = fixtures::identity::create(&app, &fixtures::now()).await; + let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; + + // Call the endpoint (twice) + + let requests = vec![ + (fixtures::now(), fixtures::message::propose()), + (fixtures::now(), fixtures::message::propose()), + ]; + + for (sent_at, body) in &requests { + let request = super::Request { body: body.clone() }; + + let _ = super::handler( + State(app.clone()), + Path(conversation.id.clone()), + sent_at.clone(), + sender.clone(), + Json(request), + ) + .await + .expect("sending to a valid conversation succeeds"); + } + + // Verify the semantics + + let mut events = app + .events() + .subscribe(resume_point) + .await + .expect("subscribing always succeeds") + .filter_map(fixtures::event::stream::message) + .filter_map(fixtures::event::stream::message::sent) + .zip(stream::iter(requests)); + + while let Some((event, (sent_at, body))) = events + .next() + .expect_ready("an event should be ready for each message") + .await + { + assert_eq!(*sent_at, event.at()); + assert_eq!(sender.user.id, event.message.sender); + assert_eq!(body, event.message.body); + } +} + +#[tokio::test] +async fn nonexistent_conversation() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let sender = fixtures::identity::create(&app, &fixtures::now()).await; + + // Call the endpoint + + let sent_at = fixtures::now(); + let conversation = conversation::Id::generate(); + let request = super::Request { + body: fixtures::message::propose(), + }; + let super::Error(error) = super::handler( + State(app), + Path(conversation.clone()), + sent_at, + sender, + Json(request), + ) + .await + .expect_err("sending to a nonexistent conversation fails"); + + // Verify the structure of the response + + assert!(matches!( + error, + SendError::ConversationNotFound(error_conversation) if conversation == error_conversation + )); +} + +#[tokio::test] +async fn deleted_conversation() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let sender = fixtures::identity::create(&app, &fixtures::now()).await; + let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; + + app.conversations() + .delete(&conversation.id, &fixtures::now()) + .await + .expect("deleting a new conversation succeeds"); + + // Call the endpoint + + let sent_at = fixtures::now(); + let request = super::Request { + body: fixtures::message::propose(), + }; + let super::Error(error) = super::handler( + State(app), + Path(conversation.id.clone()), + sent_at, + sender, + Json(request), + ) + .await + .expect_err("sending to a deleted conversation fails"); + + // Verify the structure of the response + + assert!(matches!( + error, + SendError::ConversationDeleted(error_conversation) if conversation.id == error_conversation + )); +} diff --git a/src/conversation/history.rs b/src/conversation/history.rs new file mode 100644 index 0000000..601614c --- /dev/null +++ b/src/conversation/history.rs @@ -0,0 +1,69 @@ +use itertools::Itertools as _; + +use super::{ + Conversation, Id, + event::{Created, Deleted, Event}, +}; +use crate::event::{Instant, Sequence}; + +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct History { + pub conversation: Conversation, + pub deleted: Option, +} + +// State interface +impl History { + pub fn id(&self) -> &Id { + &self.conversation.id + } + + // Snapshot of this conversation as it was when created. (Note to the future: + // it's okay if this returns a redacted or modified version of the conversation. + // If we implement renames by redacting the original name, then this should + // return the renamed conversation, not the original, even if that's not how + // it was "as created.") + pub fn as_created(&self) -> Conversation { + self.conversation.clone() + } + + pub fn as_of(&self, sequence: S) -> Option + where + S: Into, + { + self.events() + .filter(Sequence::up_to(sequence.into())) + .collect() + } + + // Snapshot of this conversation as of all events recorded in this history. + pub fn as_snapshot(&self) -> Option { + self.events().collect() + } +} + +// Event factories +impl History { + pub fn events(&self) -> impl Iterator + use<> { + [self.created()] + .into_iter() + .merge_by(self.deleted(), Sequence::merge) + } + + fn created(&self) -> Event { + Created { + conversation: self.conversation.clone(), + } + .into() + } + + fn deleted(&self) -> Option { + self.deleted.map(|instant| { + Deleted { + instant, + id: self.conversation.id.clone(), + } + .into() + }) + } +} diff --git a/src/conversation/id.rs b/src/conversation/id.rs new file mode 100644 index 0000000..5f37a59 --- /dev/null +++ b/src/conversation/id.rs @@ -0,0 +1,38 @@ +use std::fmt; + +use crate::id::Id as BaseId; + +// Stable identifier for a [Conversation]. Prefixed with `C`. +#[derive( + Clone, + Debug, + Eq, + Hash, + Ord, + PartialEq, + PartialOrd, + sqlx::Type, + serde::Deserialize, + serde::Serialize, +)] +#[sqlx(transparent)] +#[serde(transparent)] +pub struct Id(BaseId); + +impl From for Id { + fn from(id: BaseId) -> Self { + Self(id) + } +} + +impl Id { + pub fn generate() -> Self { + BaseId::generate("C") + } +} + +impl fmt::Display for Id { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.0.fmt(f) + } +} diff --git a/src/conversation/mod.rs b/src/conversation/mod.rs new file mode 100644 index 0000000..3dfa187 --- /dev/null +++ b/src/conversation/mod.rs @@ -0,0 +1,10 @@ +pub mod app; +pub mod event; +pub mod handlers; +mod history; +mod id; +pub mod repo; +mod snapshot; +mod validate; + +pub use self::{event::Event, history::History, id::Id, snapshot::Conversation}; diff --git a/src/conversation/repo.rs b/src/conversation/repo.rs new file mode 100644 index 0000000..82b5f01 --- /dev/null +++ b/src/conversation/repo.rs @@ -0,0 +1,332 @@ +use futures::stream::{StreamExt as _, TryStreamExt as _}; +use sqlx::{SqliteConnection, Transaction, sqlite::Sqlite}; + +use crate::{ + clock::DateTime, + conversation::{Conversation, History, Id}, + db::NotFound, + event::{Instant, Sequence}, + name::{self, Name}, +}; + +pub trait Provider { + fn conversations(&mut self) -> Conversations; +} + +impl Provider for Transaction<'_, Sqlite> { + fn conversations(&mut self) -> Conversations { + Conversations(self) + } +} + +pub struct Conversations<'t>(&'t mut SqliteConnection); + +impl Conversations<'_> { + pub async fn create(&mut self, name: &Name, created: &Instant) -> Result { + let id = Id::generate(); + let name = name.clone(); + let display_name = name.display(); + let canonical_name = name.canonical(); + let created = *created; + + sqlx::query!( + r#" + insert into conversation (id, created_at, created_sequence, last_sequence) + values ($1, $2, $3, $4) + "#, + id, + created.at, + created.sequence, + created.sequence, + ) + .execute(&mut *self.0) + .await?; + + sqlx::query!( + r#" + insert into conversation_name (id, display_name, canonical_name) + values ($1, $2, $3) + "#, + id, + display_name, + canonical_name, + ) + .execute(&mut *self.0) + .await?; + + let conversation = History { + conversation: Conversation { + created, + id, + name: name.clone(), + deleted_at: None, + }, + deleted: None, + }; + + Ok(conversation) + } + + pub async fn by_id(&mut self, conversation: &Id) -> Result { + let conversation = sqlx::query!( + r#" + select + id as "id: Id", + name.display_name as "display_name?: String", + name.canonical_name as "canonical_name?: String", + conversation.created_at as "created_at: DateTime", + conversation.created_sequence as "created_sequence: Sequence", + deleted.deleted_at as "deleted_at?: DateTime", + deleted.deleted_sequence as "deleted_sequence?: Sequence" + from conversation + left join conversation_name as name + using (id) + left join conversation_deleted as deleted + using (id) + where id = $1 + "#, + conversation, + ) + .map(|row| { + Ok::<_, name::Error>(History { + conversation: Conversation { + created: Instant::new(row.created_at, row.created_sequence), + id: row.id, + name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(), + deleted_at: row.deleted_at, + }, + deleted: Instant::optional(row.deleted_at, row.deleted_sequence), + }) + }) + .fetch_one(&mut *self.0) + .await??; + + Ok(conversation) + } + + pub async fn all(&mut self, resume_at: Sequence) -> Result, LoadError> { + let conversations = sqlx::query!( + r#" + select + id as "id: Id", + name.display_name as "display_name?: String", + name.canonical_name as "canonical_name?: String", + conversation.created_at as "created_at: DateTime", + conversation.created_sequence as "created_sequence: Sequence", + deleted.deleted_at as "deleted_at?: DateTime", + deleted.deleted_sequence as "deleted_sequence?: Sequence" + from conversation + left join conversation_name as name + using (id) + left join conversation_deleted as deleted + using (id) + where conversation.created_sequence <= $1 + order by name.canonical_name + "#, + resume_at, + ) + .map(|row| { + Ok::<_, name::Error>(History { + conversation: Conversation { + created: Instant::new(row.created_at, row.created_sequence), + id: row.id, + name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(), + deleted_at: row.deleted_at, + }, + deleted: Instant::optional(row.deleted_at, row.deleted_sequence), + }) + }) + .fetch(&mut *self.0) + .map(|res| Ok::<_, LoadError>(res??)) + .try_collect() + .await?; + + Ok(conversations) + } + + pub async fn replay(&mut self, resume_at: Sequence) -> Result, LoadError> { + let conversations = sqlx::query!( + r#" + select + id as "id: Id", + name.display_name as "display_name?: String", + name.canonical_name as "canonical_name?: String", + conversation.created_at as "created_at: DateTime", + conversation.created_sequence as "created_sequence: Sequence", + deleted.deleted_at as "deleted_at?: DateTime", + deleted.deleted_sequence as "deleted_sequence?: Sequence" + from conversation + left join conversation_name as name + using (id) + left join conversation_deleted as deleted + using (id) + where conversation.last_sequence > $1 + "#, + resume_at, + ) + .map(|row| { + Ok::<_, name::Error>(History { + conversation: Conversation { + created: Instant::new(row.created_at, row.created_sequence), + id: row.id, + name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(), + deleted_at: row.deleted_at, + }, + deleted: Instant::optional(row.deleted_at, row.deleted_sequence), + }) + }) + .fetch(&mut *self.0) + .map(|res| Ok::<_, LoadError>(res??)) + .try_collect() + .await?; + + Ok(conversations) + } + + pub async fn delete( + &mut self, + conversation: &History, + deleted: &Instant, + ) -> Result { + let id = conversation.id(); + sqlx::query!( + r#" + update conversation + set last_sequence = max(last_sequence, $1) + where id = $2 + returning id as "id: Id" + "#, + deleted.sequence, + id, + ) + .fetch_one(&mut *self.0) + .await?; + + sqlx::query!( + r#" + insert into conversation_deleted (id, deleted_at, deleted_sequence) + values ($1, $2, $3) + "#, + id, + deleted.at, + deleted.sequence, + ) + .execute(&mut *self.0) + .await?; + + // Small social responsibility hack here: when a conversation is deleted, its + // name is retconned to have been the empty string. Someone reading the event + // stream afterwards, or looking at conversations via the API, cannot retrieve + // the "deleted" conversation's information by ignoring the deletion event. + sqlx::query!( + r#" + delete from conversation_name + where id = $1 + "#, + id, + ) + .execute(&mut *self.0) + .await?; + + let conversation = self.by_id(id).await?; + + Ok(conversation) + } + + pub async fn purge(&mut self, purge_at: &DateTime) -> Result<(), sqlx::Error> { + let conversations = sqlx::query_scalar!( + r#" + with has_messages as ( + select conversation + from message + group by conversation + ) + delete from conversation_deleted + where deleted_at < $1 + and id not in has_messages + returning id as "id: Id" + "#, + purge_at, + ) + .fetch_all(&mut *self.0) + .await?; + + for conversation in conversations { + // Wanted: a way to batch these up into one query. + sqlx::query!( + r#" + delete from conversation + where id = $1 + "#, + conversation, + ) + .execute(&mut *self.0) + .await?; + } + + Ok(()) + } + + pub async fn expired(&mut self, expired_at: &DateTime) -> Result, LoadError> { + let conversations = sqlx::query!( + r#" + select + conversation.id as "id: Id", + name.display_name as "display_name?: String", + name.canonical_name as "canonical_name?: String", + conversation.created_at as "created_at: DateTime", + conversation.created_sequence as "created_sequence: Sequence", + deleted.deleted_at as "deleted_at?: DateTime", + deleted.deleted_sequence as "deleted_sequence?: Sequence" + from conversation + left join conversation_name as name + using (id) + left join conversation_deleted as deleted + using (id) + left join message + on conversation.id = message.conversation + where conversation.created_at < $1 + and message.id is null + and deleted.id is null + "#, + expired_at, + ) + .map(|row| { + Ok::<_, name::Error>(History { + conversation: Conversation { + created: Instant::new(row.created_at, row.created_sequence), + id: row.id, + name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(), + deleted_at: row.deleted_at, + }, + deleted: Instant::optional(row.deleted_at, row.deleted_sequence), + }) + }) + .fetch(&mut *self.0) + .map(|res| Ok::<_, LoadError>(res??)) + .try_collect() + .await?; + + Ok(conversations) + } +} + +#[derive(Debug, thiserror::Error)] +#[error(transparent)] +pub enum LoadError { + Database(#[from] sqlx::Error), + Name(#[from] name::Error), +} + +impl NotFound for Result { + type Ok = T; + type Error = LoadError; + + fn optional(self) -> Result, LoadError> { + match self { + Ok(value) => Ok(Some(value)), + Err(LoadError::Database(sqlx::Error::RowNotFound)) => Ok(None), + Err(other) => Err(other), + } + } +} diff --git a/src/conversation/snapshot.rs b/src/conversation/snapshot.rs new file mode 100644 index 0000000..da9eaae --- /dev/null +++ b/src/conversation/snapshot.rs @@ -0,0 +1,43 @@ +use super::{ + Id, + event::{Created, Event}, +}; +use crate::{clock::DateTime, event::Instant, name::Name}; + +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] +pub struct Conversation { + #[serde(flatten)] + pub created: Instant, + pub id: Id, + pub name: Name, + #[serde(skip_serializing_if = "Option::is_none")] + pub deleted_at: Option, +} + +impl Conversation { + fn apply(state: Option, event: Event) -> Option { + match (state, event) { + (None, Event::Created(event)) => Some(event.into()), + (Some(conversation), Event::Deleted(event)) if conversation.id == event.id => None, + (state, event) => panic!("invalid conversation event {event:#?} for state {state:#?}"), + } + } +} + +impl FromIterator for Option { + fn from_iter>(events: I) -> Self { + events.into_iter().fold(None, Conversation::apply) + } +} + +impl From<&Created> for Conversation { + fn from(event: &Created) -> Self { + event.conversation.clone() + } +} + +impl From for Conversation { + fn from(event: Created) -> Self { + event.conversation + } +} diff --git a/src/conversation/validate.rs b/src/conversation/validate.rs new file mode 100644 index 0000000..7894e0c --- /dev/null +++ b/src/conversation/validate.rs @@ -0,0 +1,25 @@ +use std::ops::Not as _; + +use unicode_segmentation::UnicodeSegmentation as _; + +use crate::name::Name; + +// Picked out of a hat. The power of two is not meaningful. +const NAME_TOO_LONG: usize = 64; + +pub fn name(name: &Name) -> bool { + let display = name.display(); + + [ + display.graphemes(true).count() < NAME_TOO_LONG, + display.chars().any(char::is_control).not(), + display.chars().next().is_some_and(|c| !c.is_whitespace()), + display.chars().last().is_some_and(|c| !c.is_whitespace()), + display + .chars() + .zip(display.chars().skip(1)) + .all(|(a, b)| !(a.is_whitespace() && b.is_whitespace())), + ] + .into_iter() + .all(|value| value) +} diff --git a/src/event/app.rs b/src/event/app.rs index 45a9099..7359bfb 100644 --- a/src/event/app.rs +++ b/src/event/app.rs @@ -7,7 +7,7 @@ use sqlx::sqlite::SqlitePool; use super::{Event, Sequence, Sequenced, broadcaster::Broadcaster}; use crate::{ - channel::{self, repo::Provider as _}, + conversation::{self, repo::Provider as _}, message::{self, repo::Provider as _}, name, user::{self, repo::Provider as _}, @@ -41,10 +41,10 @@ impl<'a> Events<'a> { .filter(Sequence::after(resume_at)) .map(Event::from); - let channels = tx.channels().replay(resume_at).await?; - let channel_events = channels + let conversations = tx.conversations().replay(resume_at).await?; + let conversation_events = conversations .iter() - .map(channel::History::events) + .map(conversation::History::events) .kmerge_by(Sequence::merge) .filter(Sequence::after(resume_at)) .map(Event::from); @@ -58,7 +58,7 @@ impl<'a> Events<'a> { .map(Event::from); let replay_events = user_events - .merge_by(channel_events, Sequence::merge) + .merge_by(conversation_events, Sequence::merge) .merge_by(message_events, Sequence::merge) .collect::>(); let resume_live_at = replay_events.last().map_or(resume_at, Sequenced::sequence); @@ -98,9 +98,9 @@ impl From for Error { } } -impl From for Error { - fn from(error: channel::repo::LoadError) -> Self { - use channel::repo::LoadError; +impl From for Error { + fn from(error: conversation::repo::LoadError) -> Self { + use conversation::repo::LoadError; match error { LoadError::Database(error) => error.into(), LoadError::Name(error) => error.into(), diff --git a/src/event/handlers/stream/test/channel.rs b/src/event/handlers/stream/test/channel.rs deleted file mode 100644 index 2b87ce2..0000000 --- a/src/event/handlers/stream/test/channel.rs +++ /dev/null @@ -1,273 +0,0 @@ -use axum::extract::State; -use axum_extra::extract::Query; -use futures::{future, stream::StreamExt as _}; - -use crate::test::fixtures::{self, future::Expect as _}; - -#[tokio::test] -async fn creating() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let resume_point = fixtures::boot::resume_point(&app).await; - - // Subscribe - - let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let super::Response(events) = super::handler( - State(app.clone()), - subscriber, - None, - Query(super::QueryParams { resume_point }), - ) - .await - .expect("subscribe never fails"); - - // Create a channel - - let name = fixtures::channel::propose(); - let channel = app - .channels() - .create(&name, &fixtures::now()) - .await - .expect("creating a channel succeeds"); - - // Verify channel created event - - events - .filter_map(fixtures::event::stream::channel) - .filter_map(fixtures::event::stream::channel::created) - .filter(|event| future::ready(event.channel == channel)) - .next() - .expect_some("channel created event is delivered") - .await; -} - -#[tokio::test] -async fn previously_created() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let resume_point = fixtures::boot::resume_point(&app).await; - - // Create a channel - - let name = fixtures::channel::propose(); - let channel = app - .channels() - .create(&name, &fixtures::now()) - .await - .expect("creating a channel succeeds"); - - // Subscribe - - let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let super::Response(events) = super::handler( - State(app.clone()), - subscriber, - None, - Query(super::QueryParams { resume_point }), - ) - .await - .expect("subscribe never fails"); - - // Verify channel created event - - let _ = events - .filter_map(fixtures::event::stream::channel) - .filter_map(fixtures::event::stream::channel::created) - .filter(|event| future::ready(event.channel == channel)) - .next() - .expect_some("channel created event is delivered") - .await; -} - -#[tokio::test] -async fn expiring() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; - let resume_point = fixtures::boot::resume_point(&app).await; - - // Subscribe - - let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let super::Response(events) = super::handler( - State(app.clone()), - subscriber, - None, - Query(super::QueryParams { resume_point }), - ) - .await - .expect("subscribe never fails"); - - // Expire channels - - app.channels() - .expire(&fixtures::now()) - .await - .expect("expiring channels always succeeds"); - - // Check for expiry event - let _ = events - .filter_map(fixtures::event::stream::channel) - .filter_map(fixtures::event::stream::channel::deleted) - .filter(|event| future::ready(event.id == channel.id)) - .next() - .expect_some("a deleted channel event will be delivered") - .await; -} - -#[tokio::test] -async fn previously_expired() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; - let resume_point = fixtures::boot::resume_point(&app).await; - - // Expire channels - - app.channels() - .expire(&fixtures::now()) - .await - .expect("expiring channels always succeeds"); - - // Subscribe - - let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let super::Response(events) = super::handler( - State(app.clone()), - subscriber, - None, - Query(super::QueryParams { resume_point }), - ) - .await - .expect("subscribe never fails"); - - // Check for expiry event - let _ = events - .filter_map(fixtures::event::stream::channel) - .filter_map(fixtures::event::stream::channel::deleted) - .filter(|event| future::ready(event.id == channel.id)) - .next() - .expect_some("a deleted channel event will be delivered") - .await; -} - -#[tokio::test] -async fn deleting() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; - let resume_point = fixtures::boot::resume_point(&app).await; - - // Subscribe - - let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let super::Response(events) = super::handler( - State(app.clone()), - subscriber, - None, - Query(super::QueryParams { resume_point }), - ) - .await - .expect("subscribe never fails"); - - // Delete the channel - - app.channels() - .delete(&channel.id, &fixtures::now()) - .await - .expect("deleting a valid channel succeeds"); - - // Check for delete event - let _ = events - .filter_map(fixtures::event::stream::channel) - .filter_map(fixtures::event::stream::channel::deleted) - .filter(|event| future::ready(event.id == channel.id)) - .next() - .expect_some("a deleted channel event will be delivered") - .await; -} - -#[tokio::test] -async fn previously_deleted() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; - let resume_point = fixtures::boot::resume_point(&app).await; - - // Delete the channel - - app.channels() - .delete(&channel.id, &fixtures::now()) - .await - .expect("deleting a valid channel succeeds"); - - // Subscribe - - let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let super::Response(events) = super::handler( - State(app.clone()), - subscriber, - None, - Query(super::QueryParams { resume_point }), - ) - .await - .expect("subscribe never fails"); - - // Check for expiry event - let _ = events - .filter_map(fixtures::event::stream::channel) - .filter_map(fixtures::event::stream::channel::deleted) - .filter(|event| future::ready(event.id == channel.id)) - .next() - .expect_some("a deleted channel event will be delivered") - .await; -} - -#[tokio::test] -async fn previously_purged() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; - let resume_point = fixtures::boot::resume_point(&app).await; - - // Delete and purge the channel - - app.channels() - .delete(&channel.id, &fixtures::ancient()) - .await - .expect("deleting a valid channel succeeds"); - - app.channels() - .purge(&fixtures::now()) - .await - .expect("purging channels always succeeds"); - - // Subscribe - - let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let super::Response(events) = super::handler( - State(app.clone()), - subscriber, - None, - Query(super::QueryParams { resume_point }), - ) - .await - .expect("subscribe never fails"); - - // Check for expiry event - events - .filter_map(fixtures::event::stream::channel) - .filter_map(fixtures::event::stream::channel::deleted) - .filter(|event| future::ready(event.id == channel.id)) - .next() - .expect_wait("deleted channel events not delivered") - .await; -} diff --git a/src/event/handlers/stream/test/conversation.rs b/src/event/handlers/stream/test/conversation.rs new file mode 100644 index 0000000..5e08075 --- /dev/null +++ b/src/event/handlers/stream/test/conversation.rs @@ -0,0 +1,273 @@ +use axum::extract::State; +use axum_extra::extract::Query; +use futures::{future, stream::StreamExt as _}; + +use crate::test::fixtures::{self, future::Expect as _}; + +#[tokio::test] +async fn creating() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let resume_point = fixtures::boot::resume_point(&app).await; + + // Subscribe + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let super::Response(events) = super::handler( + State(app.clone()), + subscriber, + None, + Query(super::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); + + // Create a conversation + + let name = fixtures::conversation::propose(); + let conversation = app + .conversations() + .create(&name, &fixtures::now()) + .await + .expect("creating a conversation succeeds"); + + // Verify conversation created event + + events + .filter_map(fixtures::event::stream::conversation) + .filter_map(fixtures::event::stream::conversation::created) + .filter(|event| future::ready(event.conversation == conversation)) + .next() + .expect_some("conversation created event is delivered") + .await; +} + +#[tokio::test] +async fn previously_created() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let resume_point = fixtures::boot::resume_point(&app).await; + + // Create a conversation + + let name = fixtures::conversation::propose(); + let conversation = app + .conversations() + .create(&name, &fixtures::now()) + .await + .expect("creating a conversation succeeds"); + + // Subscribe + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let super::Response(events) = super::handler( + State(app.clone()), + subscriber, + None, + Query(super::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); + + // Verify conversation created event + + let _ = events + .filter_map(fixtures::event::stream::conversation) + .filter_map(fixtures::event::stream::conversation::created) + .filter(|event| future::ready(event.conversation == conversation)) + .next() + .expect_some("conversation created event is delivered") + .await; +} + +#[tokio::test] +async fn expiring() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let conversation = fixtures::conversation::create(&app, &fixtures::ancient()).await; + let resume_point = fixtures::boot::resume_point(&app).await; + + // Subscribe + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let super::Response(events) = super::handler( + State(app.clone()), + subscriber, + None, + Query(super::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); + + // Expire conversations + + app.conversations() + .expire(&fixtures::now()) + .await + .expect("expiring conversations always succeeds"); + + // Check for expiry event + let _ = events + .filter_map(fixtures::event::stream::conversation) + .filter_map(fixtures::event::stream::conversation::deleted) + .filter(|event| future::ready(event.id == conversation.id)) + .next() + .expect_some("a deleted conversation event will be delivered") + .await; +} + +#[tokio::test] +async fn previously_expired() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let conversation = fixtures::conversation::create(&app, &fixtures::ancient()).await; + let resume_point = fixtures::boot::resume_point(&app).await; + + // Expire conversations + + app.conversations() + .expire(&fixtures::now()) + .await + .expect("expiring conversation always succeeds"); + + // Subscribe + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let super::Response(events) = super::handler( + State(app.clone()), + subscriber, + None, + Query(super::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); + + // Check for expiry event + let _ = events + .filter_map(fixtures::event::stream::conversation) + .filter_map(fixtures::event::stream::conversation::deleted) + .filter(|event| future::ready(event.id == conversation.id)) + .next() + .expect_some("a deleted conversation event will be delivered") + .await; +} + +#[tokio::test] +async fn deleting() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; + + // Subscribe + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let super::Response(events) = super::handler( + State(app.clone()), + subscriber, + None, + Query(super::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); + + // Delete the conversation + + app.conversations() + .delete(&conversation.id, &fixtures::now()) + .await + .expect("deleting a valid conversation succeeds"); + + // Check for delete event + let _ = events + .filter_map(fixtures::event::stream::conversation) + .filter_map(fixtures::event::stream::conversation::deleted) + .filter(|event| future::ready(event.id == conversation.id)) + .next() + .expect_some("a deleted conversation event will be delivered") + .await; +} + +#[tokio::test] +async fn previously_deleted() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; + + // Delete the conversation + + app.conversations() + .delete(&conversation.id, &fixtures::now()) + .await + .expect("deleting a valid conversation succeeds"); + + // Subscribe + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let super::Response(events) = super::handler( + State(app.clone()), + subscriber, + None, + Query(super::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); + + // Check for expiry event + let _ = events + .filter_map(fixtures::event::stream::conversation) + .filter_map(fixtures::event::stream::conversation::deleted) + .filter(|event| future::ready(event.id == conversation.id)) + .next() + .expect_some("a deleted conversation event will be delivered") + .await; +} + +#[tokio::test] +async fn previously_purged() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let conversation = fixtures::conversation::create(&app, &fixtures::ancient()).await; + let resume_point = fixtures::boot::resume_point(&app).await; + + // Delete and purge the conversation + + app.conversations() + .delete(&conversation.id, &fixtures::ancient()) + .await + .expect("deleting a valid conversation succeeds"); + + app.conversations() + .purge(&fixtures::now()) + .await + .expect("purging conversations always succeeds"); + + // Subscribe + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let super::Response(events) = super::handler( + State(app.clone()), + subscriber, + None, + Query(super::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); + + // Check for expiry event + events + .filter_map(fixtures::event::stream::conversation) + .filter_map(fixtures::event::stream::conversation::deleted) + .filter(|event| future::ready(event.id == conversation.id)) + .next() + .expect_wait("deleted conversation events not delivered") + .await; +} diff --git a/src/event/handlers/stream/test/message.rs b/src/event/handlers/stream/test/message.rs index 4369996..3fba317 100644 --- a/src/event/handlers/stream/test/message.rs +++ b/src/event/handlers/stream/test/message.rs @@ -12,7 +12,7 @@ async fn sending() { // Set up the environment let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; let resume_point = fixtures::boot::resume_point(&app).await; // Call the endpoint @@ -33,7 +33,7 @@ async fn sending() { let message = app .messages() .send( - &channel.id, + &conversation.id, &sender, &fixtures::now(), &fixtures::message::propose(), @@ -57,7 +57,7 @@ async fn previously_sent() { // Set up the environment let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; let resume_point = fixtures::boot::resume_point(&app).await; // Send a message @@ -66,7 +66,7 @@ async fn previously_sent() { let message = app .messages() .send( - &channel.id, + &conversation.id, &sender, &fixtures::now(), &fixtures::message::propose(), @@ -98,27 +98,30 @@ async fn previously_sent() { } #[tokio::test] -async fn sent_in_multiple_channels() { +async fn sent_in_multiple_conversations() { // Set up the environment let app = fixtures::scratch_app().await; let sender = fixtures::user::create(&app, &fixtures::now()).await; let resume_point = fixtures::boot::resume_point(&app).await; - let channels = [ - fixtures::channel::create(&app, &fixtures::now()).await, - fixtures::channel::create(&app, &fixtures::now()).await, + let conversations = [ + fixtures::conversation::create(&app, &fixtures::now()).await, + fixtures::conversation::create(&app, &fixtures::now()).await, ]; - let messages = stream::iter(channels) - .then(|channel| { - let app = app.clone(); - let sender = sender.clone(); - let channel = channel.clone(); - async move { fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await } - }) - .collect::>() - .await; + let messages = + stream::iter(conversations) + .then(|conversation| { + let app = app.clone(); + let sender = sender.clone(); + let conversation = conversation.clone(); + async move { + fixtures::message::send(&app, &conversation, &sender, &fixtures::now()).await + } + }) + .collect::>() + .await; // Call the endpoint @@ -152,14 +155,14 @@ async fn sent_sequentially() { // Set up the environment let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; let sender = fixtures::user::create(&app, &fixtures::now()).await; let resume_point = fixtures::boot::resume_point(&app).await; let messages = vec![ - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &conversation, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &conversation, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &conversation, &sender, &fixtures::now()).await, ]; // Subscribe @@ -196,9 +199,9 @@ async fn expiring() { // Set up the environment let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; + let conversation = fixtures::conversation::create(&app, &fixtures::ancient()).await; let sender = fixtures::user::create(&app, &fixtures::ancient()).await; - let message = fixtures::message::send(&app, &channel, &sender, &fixtures::ancient()).await; + let message = fixtures::message::send(&app, &conversation, &sender, &fixtures::ancient()).await; let resume_point = fixtures::boot::resume_point(&app).await; // Subscribe @@ -235,9 +238,9 @@ async fn previously_expired() { // Set up the environment let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; + let conversation = fixtures::conversation::create(&app, &fixtures::ancient()).await; let sender = fixtures::user::create(&app, &fixtures::ancient()).await; - let message = fixtures::message::send(&app, &channel, &sender, &fixtures::ancient()).await; + let message = fixtures::message::send(&app, &conversation, &sender, &fixtures::ancient()).await; let resume_point = fixtures::boot::resume_point(&app).await; // Expire messages @@ -274,9 +277,9 @@ async fn deleting() { // Set up the environment let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; let sender = fixtures::user::create(&app, &fixtures::now()).await; - let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; + let message = fixtures::message::send(&app, &conversation, &sender, &fixtures::now()).await; let resume_point = fixtures::boot::resume_point(&app).await; // Subscribe @@ -313,9 +316,9 @@ async fn previously_deleted() { // Set up the environment let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; let sender = fixtures::user::create(&app, &fixtures::now()).await; - let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; + let message = fixtures::message::send(&app, &conversation, &sender, &fixtures::now()).await; let resume_point = fixtures::boot::resume_point(&app).await; // Delete the message @@ -352,9 +355,9 @@ async fn previously_purged() { // Set up the environment let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; + let conversation = fixtures::conversation::create(&app, &fixtures::ancient()).await; let sender = fixtures::user::create(&app, &fixtures::ancient()).await; - let message = fixtures::message::send(&app, &channel, &sender, &fixtures::ancient()).await; + let message = fixtures::message::send(&app, &conversation, &sender, &fixtures::ancient()).await; let resume_point = fixtures::boot::resume_point(&app).await; // Purge the message diff --git a/src/event/handlers/stream/test/mod.rs b/src/event/handlers/stream/test/mod.rs index df43deb..3bc634f 100644 --- a/src/event/handlers/stream/test/mod.rs +++ b/src/event/handlers/stream/test/mod.rs @@ -1,4 +1,4 @@ -mod channel; +mod conversation; mod invite; mod message; mod resume; diff --git a/src/event/handlers/stream/test/resume.rs b/src/event/handlers/stream/test/resume.rs index 835d350..a0da692 100644 --- a/src/event/handlers/stream/test/resume.rs +++ b/src/event/handlers/stream/test/resume.rs @@ -14,15 +14,16 @@ async fn resume() { // Set up the environment let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; let sender = fixtures::user::create(&app, &fixtures::now()).await; let resume_point = fixtures::boot::resume_point(&app).await; - let initial_message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; + let initial_message = + fixtures::message::send(&app, &conversation, &sender, &fixtures::now()).await; let later_messages = vec![ - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &conversation, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &conversation, &sender, &fixtures::now()).await, ]; // Call the endpoint @@ -75,8 +76,8 @@ async fn resume() { // This test verifies a real bug I hit developing the vector-of-sequences // approach to resuming events. A small omission caused the event IDs in a -// resumed stream to _omit_ channels that were in the original stream until -// those channels also appeared in the resumed stream. +// resumed stream to _omit_ conversations that were in the original stream +// until those conversations also appeared in the resumed stream. // // Clients would see something like // * In the original stream, Cfoo=5,Cbar=8 @@ -84,8 +85,8 @@ async fn resume() { // // Disconnecting and reconnecting a second time, using event IDs from that // initial period of the first resume attempt, would then cause the second -// resume attempt to restart all other channels from the beginning, and not -// from where the first disconnection happened. +// resume attempt to restart all other conversations from the beginning, and +// not from where the first disconnection happened. // // As we have switched to a single global event sequence number, this scenario // can no longer arise, but this test is preserved because the actual behaviour @@ -97,8 +98,8 @@ async fn serial_resume() { let app = fixtures::scratch_app().await; let sender = fixtures::user::create(&app, &fixtures::now()).await; - let channel_a = fixtures::channel::create(&app, &fixtures::now()).await; - let channel_b = fixtures::channel::create(&app, &fixtures::now()).await; + let conversation_a = fixtures::conversation::create(&app, &fixtures::now()).await; + let conversation_b = fixtures::conversation::create(&app, &fixtures::now()).await; let resume_point = fixtures::boot::resume_point(&app).await; // Call the endpoint @@ -107,8 +108,8 @@ async fn serial_resume() { let resume_at = { let initial_messages = [ - fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await, - fixtures::message::send(&app, &channel_b, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &conversation_a, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &conversation_b, &sender, &fixtures::now()).await, ]; // First subscription @@ -148,11 +149,11 @@ async fn serial_resume() { // Resume after disconnect let resume_at = { let resume_messages = [ - // Note that channel_b does not appear here. The buggy behaviour - // would be masked if channel_b happened to send a new message + // Note that conversation_b does not appear here. The buggy behaviour + // would be masked if conversation_b happened to send a new message // into the resumed event stream. - fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await, - fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &conversation_a, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &conversation_a, &sender, &fixtures::now()).await, ]; // Second subscription @@ -190,12 +191,12 @@ async fn serial_resume() { // Resume after disconnect a second time { - // At this point, we can send on either channel and demonstrate the - // problem. The resume point should before both of these messages, but - // after _all_ prior messages. + // At this point, we can send on either conversation and demonstrate + // the problem. The resume point should before both of these messages, + // but after _all_ prior messages. let final_messages = [ - fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await, - fixtures::message::send(&app, &channel_b, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &conversation_a, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &conversation_b, &sender, &fixtures::now()).await, ]; // Third subscription diff --git a/src/event/handlers/stream/test/token.rs b/src/event/handlers/stream/test/token.rs index e32b489..5af07a0 100644 --- a/src/event/handlers/stream/test/token.rs +++ b/src/event/handlers/stream/test/token.rs @@ -9,7 +9,7 @@ async fn terminates_on_token_expiry() { // Set up the environment let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; let sender = fixtures::user::create(&app, &fixtures::now()).await; let resume_point = fixtures::boot::resume_point(&app).await; @@ -37,9 +37,9 @@ async fn terminates_on_token_expiry() { // These should not be delivered. let messages = [ - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &conversation, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &conversation, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &conversation, &sender, &fixtures::now()).await, ]; events @@ -56,7 +56,7 @@ async fn terminates_on_logout() { // Set up the environment let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; let sender = fixtures::user::create(&app, &fixtures::now()).await; let resume_point = fixtures::boot::resume_point(&app).await; @@ -83,9 +83,9 @@ async fn terminates_on_logout() { // These should not be delivered. let messages = [ - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &conversation, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &conversation, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &conversation, &sender, &fixtures::now()).await, ]; events @@ -102,7 +102,7 @@ async fn terminates_on_password_change() { // Set up the environment let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; let sender = fixtures::user::create(&app, &fixtures::now()).await; let resume_point = fixtures::boot::resume_point(&app).await; @@ -133,9 +133,9 @@ async fn terminates_on_password_change() { // These should not be delivered. let messages = [ - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &conversation, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &conversation, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &conversation, &sender, &fixtures::now()).await, ]; events diff --git a/src/event/mod.rs b/src/event/mod.rs index 6657243..801bcb9 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -2,7 +2,7 @@ use std::time::Duration; use axum::response::sse::{self, KeepAlive}; -use crate::{channel, message, user}; +use crate::{conversation, message, user}; pub mod app; mod broadcaster; @@ -20,7 +20,7 @@ pub use self::{ #[serde(tag = "type", rename_all = "snake_case")] pub enum Event { User(user::Event), - Channel(channel::Event), + Channel(conversation::Event), Message(message::Event), } @@ -50,8 +50,8 @@ impl From for Event { } } -impl From for Event { - fn from(event: channel::Event) -> Self { +impl From for Event { + fn from(event: conversation::Event) -> Self { Self::Channel(event) } } diff --git a/src/expire.rs b/src/expire.rs index 1427a8d..4177a53 100644 --- a/src/expire.rs +++ b/src/expire.rs @@ -6,7 +6,7 @@ use axum::{ use crate::{app::App, clock::RequestedAt, error::Internal}; -// Expires messages and channels before each request. +// Expires messages and conversations before each request. pub async fn middleware( State(app): State, RequestedAt(expired_at): RequestedAt, @@ -17,7 +17,7 @@ pub async fn middleware( app.invites().expire(&expired_at).await?; app.messages().expire(&expired_at).await?; app.messages().purge(&expired_at).await?; - app.channels().expire(&expired_at).await?; - app.channels().purge(&expired_at).await?; + app.conversations().expire(&expired_at).await?; + app.conversations().purge(&expired_at).await?; Ok(next.run(req).await) } diff --git a/src/lib.rs b/src/lib.rs index d2fa390..0fda855 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,9 +5,9 @@ mod app; mod boot; mod broadcast; -mod channel; pub mod cli; mod clock; +mod conversation; mod db; mod error; mod event; diff --git a/src/message/app.rs b/src/message/app.rs index 9792c8f..bdc2164 100644 --- a/src/message/app.rs +++ b/src/message/app.rs @@ -4,8 +4,8 @@ use sqlx::sqlite::SqlitePool; use super::{Body, Id, Message, repo::Provider as _}; use crate::{ - channel::{self, repo::Provider as _}, clock::DateTime, + conversation::{self, repo::Provider as _}, db::NotFound as _, event::{Broadcaster, Event, Sequence, repo::Provider as _}, name, @@ -24,23 +24,29 @@ impl<'a> Messages<'a> { pub async fn send( &self, - channel: &channel::Id, + conversation: &conversation::Id, sender: &User, sent_at: &DateTime, body: &Body, ) -> Result { - let to_not_found = || SendError::ChannelNotFound(channel.clone()); - let to_deleted = || SendError::ChannelDeleted(channel.clone()); + let to_not_found = || SendError::ConversationNotFound(conversation.clone()); + let to_deleted = || SendError::ConversationDeleted(conversation.clone()); let mut tx = self.db.begin().await?; - let channel = tx.channels().by_id(channel).await.not_found(to_not_found)?; + let conversation = tx + .conversations() + .by_id(conversation) + .await + .not_found(to_not_found)?; // Ordering: don't bother allocating a sequence number before we know the channel might // exist. let sent = tx.sequence().next(sent_at).await?; - let channel = channel.as_of(sent).ok_or_else(to_deleted)?; - - let message = tx.messages().create(&channel, sender, &sent, body).await?; + let conversation = conversation.as_of(sent).ok_or_else(to_deleted)?; + let message = tx + .messages() + .create(&conversation, sender, &sent, body) + .await?; tx.commit().await?; self.events @@ -128,19 +134,19 @@ impl<'a> Messages<'a> { #[derive(Debug, thiserror::Error)] pub enum SendError { - #[error("channel {0} not found")] - ChannelNotFound(channel::Id), - #[error("channel {0} deleted")] - ChannelDeleted(channel::Id), + #[error("conversation {0} not found")] + ConversationNotFound(conversation::Id), + #[error("conversation {0} deleted")] + ConversationDeleted(conversation::Id), #[error(transparent)] Database(#[from] sqlx::Error), #[error(transparent)] Name(#[from] name::Error), } -impl From for SendError { - fn from(error: channel::repo::LoadError) -> Self { - use channel::repo::LoadError; +impl From for SendError { + fn from(error: conversation::repo::LoadError) -> Self { + use conversation::repo::LoadError; match error { LoadError::Database(error) => error.into(), LoadError::Name(error) => error.into(), diff --git a/src/message/handlers/delete/test.rs b/src/message/handlers/delete/test.rs index f567eb7..371c7bf 100644 --- a/src/message/handlers/delete/test.rs +++ b/src/message/handlers/delete/test.rs @@ -9,8 +9,9 @@ pub async fn delete_message() { let app = fixtures::scratch_app().await; let sender = fixtures::identity::create(&app, &fixtures::now()).await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; - let message = fixtures::message::send(&app, &channel, &sender.user, &fixtures::now()).await; + let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; + let message = + fixtures::message::send(&app, &conversation, &sender.user, &fixtures::now()).await; // Send the request @@ -70,8 +71,8 @@ pub async fn delete_deleted() { let app = fixtures::scratch_app().await; let sender = fixtures::user::create(&app, &fixtures::now()).await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; - let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; + let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; + let message = fixtures::message::send(&app, &conversation, &sender, &fixtures::now()).await; app.messages() .delete(&sender, &message.id, &fixtures::now()) @@ -101,8 +102,8 @@ pub async fn delete_expired() { let app = fixtures::scratch_app().await; let sender = fixtures::user::create(&app, &fixtures::ancient()).await; - let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; - let message = fixtures::message::send(&app, &channel, &sender, &fixtures::ancient()).await; + let conversation = fixtures::conversation::create(&app, &fixtures::ancient()).await; + let message = fixtures::message::send(&app, &conversation, &sender, &fixtures::ancient()).await; app.messages() .expire(&fixtures::now()) @@ -132,8 +133,8 @@ pub async fn delete_purged() { let app = fixtures::scratch_app().await; let sender = fixtures::user::create(&app, &fixtures::ancient()).await; - let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; - let message = fixtures::message::send(&app, &channel, &sender, &fixtures::ancient()).await; + let conversation = fixtures::conversation::create(&app, &fixtures::ancient()).await; + let message = fixtures::message::send(&app, &conversation, &sender, &fixtures::ancient()).await; app.messages() .expire(&fixtures::old()) @@ -168,8 +169,8 @@ pub async fn delete_not_sender() { let app = fixtures::scratch_app().await; let sender = fixtures::user::create(&app, &fixtures::now()).await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; - let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; + let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; + let message = fixtures::message::send(&app, &conversation, &sender, &fixtures::now()).await; // Send the request diff --git a/src/message/repo.rs b/src/message/repo.rs index 159ce8e..68f6e4a 100644 --- a/src/message/repo.rs +++ b/src/message/repo.rs @@ -2,8 +2,8 @@ use sqlx::{SqliteConnection, Transaction, sqlite::Sqlite}; use super::{Body, History, Id, snapshot::Message}; use crate::{ - channel::{self, Channel}, clock::DateTime, + conversation::{self, Conversation}, event::{Instant, Sequence}, user::{self, User}, }; @@ -23,7 +23,7 @@ pub struct Messages<'t>(&'t mut SqliteConnection); impl Messages<'_> { pub async fn create( &mut self, - channel: &Channel, + conversation: &Conversation, sender: &User, sent: &Instant, body: &Body, @@ -37,14 +37,14 @@ impl Messages<'_> { values ($1, $2, $3, $4, $5, $6, $7) returning id as "id: Id", - conversation as "conversation: channel::Id", + conversation as "conversation: conversation::Id", sender as "sender: user::Id", sent_at as "sent_at: DateTime", sent_sequence as "sent_sequence: Sequence", body as "body: Body" "#, id, - channel.id, + conversation.id, sender.id, sent.at, sent.sequence, @@ -68,12 +68,15 @@ impl Messages<'_> { Ok(message) } - pub async fn live(&mut self, channel: &channel::History) -> Result, sqlx::Error> { - let channel_id = channel.id(); + pub async fn live( + &mut self, + conversation: &conversation::History, + ) -> Result, sqlx::Error> { + let conversation_id = conversation.id(); let messages = sqlx::query!( r#" select - message.conversation as "conversation: channel::Id", + message.conversation as "conversation: conversation::Id", message.sender as "sender: user::Id", id as "id: Id", message.body as "body: Body", @@ -87,7 +90,7 @@ impl Messages<'_> { where message.conversation = $1 and deleted.id is null "#, - channel_id, + conversation_id, ) .map(|row| History { message: Message { @@ -110,7 +113,7 @@ impl Messages<'_> { let messages = sqlx::query!( r#" select - message.conversation as "conversation: channel::Id", + message.conversation as "conversation: conversation::Id", message.sender as "sender: user::Id", message.id as "id: Id", message.body as "body: Body", @@ -147,7 +150,7 @@ impl Messages<'_> { let message = sqlx::query!( r#" select - message.conversation as "conversation: channel::Id", + message.conversation as "conversation: conversation::Id", message.sender as "sender: user::Id", id as "id: Id", message.body as "body: Body", @@ -200,7 +203,7 @@ impl Messages<'_> { // Small social responsibility hack here: when a message is deleted, its body is // retconned to have been the empty string. Someone reading the event stream - // afterwards, or looking at messages in the channel, cannot retrieve the + // afterwards, or looking at messages in the conversation, cannot retrieve the // "deleted" message by ignoring the deletion event. sqlx::query!( r#" @@ -252,7 +255,7 @@ impl Messages<'_> { r#" select id as "id: Id", - message.conversation as "conversation: channel::Id", + message.conversation as "conversation: conversation::Id", message.sender as "sender: user::Id", message.sent_at as "sent_at: DateTime", message.sent_sequence as "sent_sequence: Sequence", @@ -289,7 +292,7 @@ impl Messages<'_> { r#" select id as "id: Id", - message.conversation as "conversation: channel::Id", + message.conversation as "conversation: conversation::Id", message.sender as "sender: user::Id", message.sent_at as "sent_at: DateTime", message.sent_sequence as "sent_sequence: Sequence", diff --git a/src/message/snapshot.rs b/src/message/snapshot.rs index ac067f7..0e6e9ae 100644 --- a/src/message/snapshot.rs +++ b/src/message/snapshot.rs @@ -2,13 +2,13 @@ use super::{ Body, Id, event::{Event, Sent}, }; -use crate::{channel, clock::DateTime, event::Instant, user}; +use crate::{clock::DateTime, conversation, event::Instant, user}; #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] pub struct Message { #[serde(flatten)] pub sent: Instant, - pub channel: channel::Id, + pub channel: conversation::Id, pub sender: user::Id, pub id: Id, pub body: Body, diff --git a/src/routes.rs b/src/routes.rs index 1e66582..ca4c60c 100644 --- a/src/routes.rs +++ b/src/routes.rs @@ -4,7 +4,7 @@ use axum::{ routing::{delete, get, post}, }; -use crate::{app::App, boot, channel, event, expire, invite, message, setup, ui, user}; +use crate::{app::App, boot, conversation, event, expire, invite, message, setup, ui, user}; pub fn routes(app: &App) -> Router { // UI routes that can be accessed before the administrator completes setup. @@ -15,7 +15,7 @@ pub fn routes(app: &App) -> Router { // UI routes that require the administrator to complete setup first. let ui_setup_required = Router::new() .route("/", get(ui::handlers::index)) - .route("/ch/{channel}", get(ui::handlers::channel)) + .route("/ch/{channel}", get(ui::handlers::conversation)) .route("/invite/{invite}", get(ui::handlers::invite)) .route("/login", get(ui::handlers::login)) .route("/me", get(ui::handlers::me)) @@ -29,9 +29,15 @@ pub fn routes(app: &App) -> Router { .route("/api/auth/login", post(user::handlers::login)) .route("/api/auth/logout", post(user::handlers::logout)) .route("/api/boot", get(boot::handlers::boot)) - .route("/api/channels", post(channel::handlers::create)) - .route("/api/channels/{channel}", post(channel::handlers::send)) - .route("/api/channels/{channel}", delete(channel::handlers::delete)) + .route("/api/channels", post(conversation::handlers::create)) + .route( + "/api/channels/{channel}", + post(conversation::handlers::send), + ) + .route( + "/api/channels/{channel}", + delete(conversation::handlers::delete), + ) .route("/api/events", get(event::handlers::stream)) .route("/api/invite", post(invite::handlers::issue)) .route("/api/invite/{invite}", get(invite::handlers::get)) diff --git a/src/test/fixtures/channel.rs b/src/test/fixtures/channel.rs deleted file mode 100644 index 98048f2..0000000 --- a/src/test/fixtures/channel.rs +++ /dev/null @@ -1,38 +0,0 @@ -use faker_rand::{ - en_us::{addresses::CityName, names::FullName}, - faker_impl_from_templates, - lorem::Paragraphs, -}; -use rand; - -use crate::{ - app::App, - channel::{self, Channel}, - clock::RequestedAt, - name::Name, -}; - -pub async fn create(app: &App, created_at: &RequestedAt) -> Channel { - let name = propose(); - app.channels() - .create(&name, created_at) - .await - .expect("should always succeed if the channel is actually new") -} - -pub fn propose() -> Name { - rand::random::().to_string().into() -} - -pub fn propose_invalid_name() -> Name { - rand::random::().to_string().into() -} - -struct NameTemplate(String); -faker_impl_from_templates! { - NameTemplate; "{} {}", CityName, FullName; -} - -pub fn fictitious() -> channel::Id { - channel::Id::generate() -} diff --git a/src/test/fixtures/conversation.rs b/src/test/fixtures/conversation.rs new file mode 100644 index 0000000..fb2f58d --- /dev/null +++ b/src/test/fixtures/conversation.rs @@ -0,0 +1,38 @@ +use faker_rand::{ + en_us::{addresses::CityName, names::FullName}, + faker_impl_from_templates, + lorem::Paragraphs, +}; +use rand; + +use crate::{ + app::App, + clock::RequestedAt, + conversation::{self, Conversation}, + name::Name, +}; + +pub async fn create(app: &App, created_at: &RequestedAt) -> Conversation { + let name = propose(); + app.conversations() + .create(&name, created_at) + .await + .expect("should always succeed if the conversation is actually new") +} + +pub fn propose() -> Name { + rand::random::().to_string().into() +} + +pub fn propose_invalid_name() -> Name { + rand::random::().to_string().into() +} + +struct NameTemplate(String); +faker_impl_from_templates! { + NameTemplate; "{} {}", CityName, FullName; +} + +pub fn fictitious() -> conversation::Id { + conversation::Id::generate() +} diff --git a/src/test/fixtures/event/mod.rs b/src/test/fixtures/event/mod.rs index 691cdeb..69c79d8 100644 --- a/src/test/fixtures/event/mod.rs +++ b/src/test/fixtures/event/mod.rs @@ -2,7 +2,7 @@ use crate::event::Event; pub mod stream; -pub fn channel(event: Event) -> Option { +pub fn conversation(event: Event) -> Option { match event { Event::Channel(channel) => Some(channel), _ => None, @@ -23,8 +23,8 @@ pub fn user(event: Event) -> Option { } } -pub mod channel { - use crate::channel::{Event, event}; +pub mod conversation { + use crate::conversation::{Event, event}; pub fn created(event: Event) -> Option { match event { diff --git a/src/test/fixtures/event/stream.rs b/src/test/fixtures/event/stream.rs index 6c2a1bf..5b3621d 100644 --- a/src/test/fixtures/event/stream.rs +++ b/src/test/fixtures/event/stream.rs @@ -2,8 +2,8 @@ use std::future::{self, Ready}; use crate::{event::Event, test::fixtures::event}; -pub fn channel(event: Event) -> Ready> { - future::ready(event::channel(event)) +pub fn conversation(event: Event) -> Ready> { + future::ready(event::conversation(event)) } pub fn message(event: Event) -> Ready> { @@ -14,20 +14,20 @@ pub fn user(event: Event) -> Ready> { future::ready(event::user(event)) } -pub mod channel { +pub mod conversation { use std::future::{self, Ready}; use crate::{ - channel::{Event, event}, - test::fixtures::event::channel, + conversation::{Event, event}, + test::fixtures::event::conversation, }; pub fn created(event: Event) -> Ready> { - future::ready(channel::created(event)) + future::ready(conversation::created(event)) } pub fn deleted(event: Event) -> Ready> { - future::ready(channel::deleted(event)) + future::ready(conversation::deleted(event)) } } diff --git a/src/test/fixtures/message.rs b/src/test/fixtures/message.rs index 2254915..03f8072 100644 --- a/src/test/fixtures/message.rs +++ b/src/test/fixtures/message.rs @@ -2,19 +2,24 @@ use faker_rand::lorem::Paragraphs; use crate::{ app::App, - channel::Channel, clock::RequestedAt, + conversation::Conversation, message::{self, Body, Message}, user::User, }; -pub async fn send(app: &App, channel: &Channel, sender: &User, sent_at: &RequestedAt) -> Message { +pub async fn send( + app: &App, + conversation: &Conversation, + sender: &User, + sent_at: &RequestedAt, +) -> Message { let body = propose(); app.messages() - .send(&channel.id, sender, sent_at, &body) + .send(&conversation.id, sender, sent_at, &body) .await - .expect("should succeed if the channel exists") + .expect("should succeed if the conversation exists") } pub fn propose() -> Body { diff --git a/src/test/fixtures/mod.rs b/src/test/fixtures/mod.rs index 418bdb5..87d3fa1 100644 --- a/src/test/fixtures/mod.rs +++ b/src/test/fixtures/mod.rs @@ -3,7 +3,7 @@ use chrono::{TimeDelta, Utc}; use crate::{app::App, clock::RequestedAt, db}; pub mod boot; -pub mod channel; +pub mod conversation; pub mod cookie; pub mod event; pub mod future; diff --git a/src/ui/handlers/channel.rs b/src/ui/handlers/channel.rs deleted file mode 100644 index d3199dd..0000000 --- a/src/ui/handlers/channel.rs +++ /dev/null @@ -1,58 +0,0 @@ -use axum::{ - extract::{Path, State}, - response::{self, IntoResponse, Redirect}, -}; - -use crate::{ - app::App, - channel::{self, app}, - error::Internal, - token::extract::Identity, - ui::{ - assets::{Asset, Assets}, - error::NotFound, - }, -}; - -pub async fn handler( - State(app): State, - identity: Option, - Path(channel): Path, -) -> Result { - let _ = identity.ok_or(Error::NotLoggedIn)?; - app.channels().get(&channel).await.map_err(Error::from)?; - - Assets::index().map_err(Error::Internal) -} - -#[derive(Debug, thiserror::Error)] -pub enum Error { - #[error("channel not found")] - NotFound, - #[error("not logged in")] - NotLoggedIn, - #[error("{0}")] - Internal(Internal), -} - -impl From for Error { - fn from(error: app::Error) -> Self { - match error { - app::Error::NotFound(_) | app::Error::Deleted(_) => Self::NotFound, - other => Self::Internal(other.into()), - } - } -} - -impl IntoResponse for Error { - fn into_response(self) -> response::Response { - match self { - Self::NotFound => match Assets::index() { - Ok(asset) => NotFound(asset).into_response(), - Err(internal) => internal.into_response(), - }, - Self::NotLoggedIn => Redirect::temporary("/login").into_response(), - Self::Internal(error) => error.into_response(), - } - } -} diff --git a/src/ui/handlers/conversation.rs b/src/ui/handlers/conversation.rs new file mode 100644 index 0000000..f1bb319 --- /dev/null +++ b/src/ui/handlers/conversation.rs @@ -0,0 +1,61 @@ +use axum::{ + extract::{Path, State}, + response::{self, IntoResponse, Redirect}, +}; + +use crate::{ + app::App, + conversation::{self, app}, + error::Internal, + token::extract::Identity, + ui::{ + assets::{Asset, Assets}, + error::NotFound, + }, +}; + +pub async fn handler( + State(app): State, + identity: Option, + Path(conversation): Path, +) -> Result { + let _ = identity.ok_or(Error::NotLoggedIn)?; + app.conversations() + .get(&conversation) + .await + .map_err(Error::from)?; + + Assets::index().map_err(Error::Internal) +} + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("conversation not found")] + NotFound, + #[error("not logged in")] + NotLoggedIn, + #[error("{0}")] + Internal(Internal), +} + +impl From for Error { + fn from(error: app::Error) -> Self { + match error { + app::Error::NotFound(_) | app::Error::Deleted(_) => Self::NotFound, + other => Self::Internal(other.into()), + } + } +} + +impl IntoResponse for Error { + fn into_response(self) -> response::Response { + match self { + Self::NotFound => match Assets::index() { + Ok(asset) => NotFound(asset).into_response(), + Err(internal) => internal.into_response(), + }, + Self::NotLoggedIn => Redirect::temporary("/login").into_response(), + Self::Internal(error) => error.into_response(), + } + } +} diff --git a/src/ui/handlers/mod.rs b/src/ui/handlers/mod.rs index 5bfd0d6..ed0c14e 100644 --- a/src/ui/handlers/mod.rs +++ b/src/ui/handlers/mod.rs @@ -1,5 +1,5 @@ mod asset; -mod channel; +mod conversation; mod index; mod invite; mod login; @@ -7,7 +7,7 @@ mod me; mod setup; pub use asset::handler as asset; -pub use channel::handler as channel; +pub use conversation::handler as conversation; pub use index::handler as index; pub use invite::handler as invite; pub use login::handler as login; -- cgit v1.2.3 From 8d412732dc094ead3c5cf86c005d187f9624fc65 Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Tue, 1 Jul 2025 14:24:36 -0400 Subject: Replace `channel` with `conversation` throughout the API. This is a **breaking change** for essentially all clients. Thankfully, there's presently just the one, so we don't need to go to much effort to accommoate that; the client is modified in this commit to adapt, users can reload their client, and life will go on. --- docs/api/SUMMARY.md | 2 +- docs/api/authentication.md | 2 +- docs/api/boot.md | 10 +- docs/api/channels-messages.md | 235 ----------------------------- docs/api/conversations-messages.md | 231 ++++++++++++++++++++++++++++ docs/api/events.md | 97 ++++++------ docs/developer/server/code-organization.md | 2 +- docs/developer/server/testing.md | 2 +- src/event/mod.rs | 6 +- src/message/repo.rs | 12 +- src/message/snapshot.rs | 2 +- src/routes.rs | 6 +- src/test/fixtures/event/mod.rs | 2 +- ui/lib/apiServer.js | 4 +- ui/lib/session.svelte.js | 10 +- ui/lib/state/remote/messages.svelte.js | 12 +- ui/lib/state/remote/state.svelte.js | 18 +-- ui/routes/(app)/ch/[channel]/+page.svelte | 4 +- 18 files changed, 321 insertions(+), 336 deletions(-) delete mode 100644 docs/api/channels-messages.md create mode 100644 docs/api/conversations-messages.md (limited to 'src') diff --git a/docs/api/SUMMARY.md b/docs/api/SUMMARY.md index 99b6352..f51fbc7 100644 --- a/docs/api/SUMMARY.md +++ b/docs/api/SUMMARY.md @@ -6,4 +6,4 @@ - [Client initialization](boot.md) - [Events](events.md) - [Invitations](invitations.md) -- [Channels and messages](channels-messages.md) +- [Conversations and messages](conversations-messages.md) diff --git a/docs/api/authentication.md b/docs/api/authentication.md index 2e9b58f..fbd5959 100644 --- a/docs/api/authentication.md +++ b/docs/api/authentication.md @@ -17,7 +17,7 @@ To create users, see [initial setup](./initial-setup.md) and [invitations](./inv ## Names - + The service handles user names using two separate forms. diff --git a/docs/api/boot.md b/docs/api/boot.md index f6e6dc2..d7e9144 100644 --- a/docs/api/boot.md +++ b/docs/api/boot.md @@ -52,7 +52,7 @@ This endpoint will respond with a status of "name": "example username" }, { - "type": "channel", + "type": "conversation", "event": "created", "at": "2025-04-14T23:58:11.421901Z", "id": "C1234abcd", @@ -62,7 +62,7 @@ This endpoint will respond with a status of "type": "message", "event": "sent", "at": "2024-09-27T23:19:10.208147Z", - "channel": "C1234abcd", + "conversation": "C1234abcd", "sender": "U1234abcd", "id": "M1312acab", "body": "beep" @@ -71,13 +71,13 @@ This endpoint will respond with a status of "type": "message", "event": "sent", "at": "2025-06-19T15:14:40.431627Z", - "channel": "Ccfdryfdb4krpy77", + "conversation": "Ccfdryfdb4krpy77", "sender": "U888j6fyc8ccrnkf", "id": "Mc6jk823wjc82734", "body": "test" }, { - "type": "channel", + "type": "conversation", "event": "created", "at": "2025-06-19T15:14:44.764263Z", "id": "C2d9y6wckph3n36x", @@ -87,7 +87,7 @@ This endpoint will respond with a status of "type": "message", "event": "sent", "at": "2025-06-19T15:29:47.376455Z", - "channel": "Ccfdryfdb4krpy77", + "conversation": "Ccfdryfdb4krpy77", "sender": "U888j6fyc8ccrnkf", "id": "M3twnj7rfk2ph744", "body": "test" diff --git a/docs/api/channels-messages.md b/docs/api/channels-messages.md deleted file mode 100644 index e762600..0000000 --- a/docs/api/channels-messages.md +++ /dev/null @@ -1,235 +0,0 @@ -# Channels and messages - -```mermaid ---- -Channel lifecycle ---- -stateDiagram-v2 - [*] --> Active : POST /api/channels - Active --> Deleted : DELETE /api/channels/C1234 - Active --> Deleted : Expiry - Deleted --> [*] : Purge -``` - -```mermaid ---- -Message lifecycle ---- -stateDiagram-v2 - [*] --> Sent : POST /api/channels/C1234 - Sent --> Deleted : DELETE /api/messages/Mabcd - Sent --> Deleted : Expiry - Deleted --> [*] : Purge -``` - -Messages allow logins to communicate with one another. Channels are the conversations to which those messages are sent. - -Every channel has a unique name, chosen when the channel is created. - -## Names - - - -The service handles channel names using two separate forms. - -The first form is as given in the request used to create the channel. This form of the channel name is used throughout the API, and the service will preserve the name as entered (other than applying normalization), so that users' preferences around capitalization and accent marks are preserved. - -The second form is a "canonical" form, used internally by the service to control uniqueness and match names to channels. The canonical form is both case-folded and normalized. - -The canonical form is not available to API clients, but its use has practical consequences. Names that differ only by case or only by code point sequence are treated as the same name. If the name is in use, changing the capitalization or changing the sequence of combining marks will not allow the creation of a second "identical" channel. - -## Expiry and purging - -Both channels and messages expire after a time. Messages expire 90 days after being sent. Channels expire 90 days after the last message sent to them, or after creation if no messages are sent in that time. - -Deleted channels and messages, including those that have expired, are temporarily retained by the service, to allow clients that are not connected to receive the corresponding deletion [events](./events.md). To limit storage growth, deleted channels and messages are purged from the service seven days after they were deleted. - -## `POST /api/channels` - -Creates a channel. - -### Request - -```json -{ - "name": "a unique channel name" -} -``` - -The request must have the following fields: - -| Field | Type | Description | -| :----- | :----- | :------------------ | -| `name` | string | The channel's name. | - -The proposed `name` must be valid. The precise definition of valid is still up in the air, but, at minimum: - -- It must be non-empty. -- It must not be "too long." (Currently, 64 characters is too long.) -- It must begin with a printing character. -- It must end with a printing character. -- It must not contain runs of multiple whitespace characters. - -### Success - -This endpoint will respond with a status of -`202 Accepted` when successful. The body of the response will be a JSON object describing the new channel: - -```json -{ - "id": "C9876cyyz", - "name": "a unique channel name" -} -``` - -The response will have the following fields: - -| Field | Type | Description | -| :----- | :----- | :-------------------------------------------------------------------------------------------------------------------------------------- | -| `id` | string | A unique identifier for the channel. This can be used to associate the channel with events, or to make API calls targeting the channel. | -| `name` | string | The channel's name. | - -The returned name may not be identical to the name requested, as the name will be converted to [normalization form C](http://www.unicode.org/reports/tr15/) automatically. The returned name will include this normalization; the service will use the normalized name elsewhere, and does not store the originally requested name. - -When completed, the service will emit a [channel created](events.md#channel-created) event with the channel's ID. - -### Name not valid - -This endpoint will respond with a status of `400 Bad Request` if the proposed `name` is not valid. - -### Channel name in use - -This endpoint will respond with a status of `409 Conflict` if a channel with the requested name already exists. - -## `POST /api/channels/:id` - -Sends a message to a channel. - -This endpoint requires the following path parameter: - -| Parameter | Type | Description | -| :-------- | :----- | :------------ | -| `id` | string | A channel ID. | - -### Request - -```json -{ - "body": "my amazing thoughts, by bob" -} -``` - -The request must have the following fields: - -| Field | Type | Description | -| :----- | :----- | :------------------------------------- | -| `body` | string | The message to deliver to the channel. | - -### Success - -This endpoint will respond with a status of -`202 Accepted` when successful. The body of the response will be a JSON object describing the newly-sent message: - -```json -{ - "at": "2024-10-19T04:37:09.467325Z", - "channel": "Cfqdn1234", - "sender": "Uabcd1234", - "id": "Mgh98yp75", - "body": "my amazing thoughts, by bob" -} -``` - -The response will have the following fields: - -| Field | Type | Description | -| :-------- | :-------- | :-------------------------------------------------------------------------------------------------------------------------------------- | -| `at` | timestamp | The moment the message was sent. | -| `channel` | string | The ID of the channel the message was sent to. | -| `sender` | string | The ID of the user that sent the message. | -| `id` | string | A unique identifier for the message. This can be used to associate the message with events, or to make API calls targeting the message. | -| `body` | string | The message's body. | - -The returned message body may not be identical to the body as sent, as the body will be converted to [normalization form C](http://www.unicode.org/reports/tr15/) automatically. The returned body will include this normalization; the service will use the normalized body elsewhere, and does not store the originally submitted body. - -When completed, the service will emit a [message sent](events.md#message-sent) event with the message's ID. - -### Invalid channel ID - -This endpoint will respond with a status of `404 Not Found` if the channel ID is not valid. - -## `DELETE /api/channels/:id` - -Deletes a channel. - -Deleting a channel prevents it from receiving any further messages. The channel must be empty; to delete a channel with messages in it, delete the messages first (or wait for them to expire). - -This endpoint requires the following path parameter: - -| Parameter | Type | Description | -| :-------- | :----- | :------------ | -| `id` | string | A channel ID. | - -### Success - -This endpoint will respond with a status of -`202 Accepted` when successful. The body of the response will be a JSON object describing the deleted channel: - -```json -{ - "id": "Cfqdn1234" -} -``` - -The response will have the following fields: - -| Field | Type | Description | -| :---- | :----- | :---------------- | -| `id` | string | The channel's ID. | - -When completed, the service will emit a [message deleted](events.md#message-deleted) event for each message in the channel, followed by a [channel deleted](events.md#channel-deleted) event with the channel's ID. - -### Channel not empty - -This endpoint will respond with a status of `409 Conflict` if the channel contains messages. - -### Invalid channel ID - -This endpoint will respond with a status of `404 Not Found` if the channel ID is not valid. - -## `DELETE /api/messages/:id` - -Deletes a message. - -This endpoint requires the following path parameter: - -| Parameter | Type | Description | -| :-------- | :----- | :------------ | -| `id` | string | A message ID. | - -### Success - -This endpoint will respond with a status of -`202 Accepted` when successful. The body of the response will be a JSON object describing the deleted message: - -```json -{ - "id": "Mgh98yp75" -} -``` - -The response will have the following fields: - -| Field | Type | Description | -| :---- | :----- | :---------------- | -| `id` | string | The message's ID. | - -When completed, the service will emit a [message deleted](events.md#message-deleted) event with the message's ID. - -### Invalid message ID - -This endpoint will respond with a status of `404 Not Found` if the message ID is not valid. - -### Not the sender - -This endpoint will respond with a status of `403 Forbidden` if the message was sent by a different login. diff --git a/docs/api/conversations-messages.md b/docs/api/conversations-messages.md new file mode 100644 index 0000000..c7995f7 --- /dev/null +++ b/docs/api/conversations-messages.md @@ -0,0 +1,231 @@ +# Conversations and messages + +```mermaid +--- +Conversation lifecycle +--- +stateDiagram-v2 + [*] --> Active : POST /api/conversations + Active --> Deleted : DELETE /api/conversations/C1234 + Active --> Deleted : Expiry + Deleted --> [*] : Purge +``` + +```mermaid +--- +Message lifecycle +--- +stateDiagram-v2 + [*] --> Sent : POST /api/conversations/C1234 + Sent --> Deleted : DELETE /api/messages/Mabcd + Sent --> Deleted : Expiry + Deleted --> [*] : Purge +``` + +Messages allow logins to communicate with one another. Conversations are the containers to which those messages are sent. + +Every conversation has a unique name, chosen when the conversation is created. + +## Names + + + +The service handles conversation names using two separate forms. + +The first form is as given in the request used to create the conversation. This form of the conversation name is used throughout the API, and the service will preserve the name as entered (other than applying normalization), so that users' preferences around capitalization and accent marks are preserved. + +The second form is a "canonical" form, used internally by the service to control uniqueness and match names to conversations. The canonical form is both case-folded and normalized. + +The canonical form is not available to API clients, but its use has practical consequences. Names that differ only by case or only by code point sequence are treated as the same name. If the name is in use, changing the capitalization or changing the sequence of combining marks will not allow the creation of a second "identical" conversation. + +## Expiry and purging + +Both conversations and messages expire after a time. Messages sent to a conversation will keep the conversation from expiring until the messages also expire. + +Deleted conversations and messages, including those that have expired, are temporarily retained by the service, to allow clients that are not connected to receive the corresponding deletion [events](./events.md). To limit storage growth, deleted conversations and messages are purged from the service seven days after they were deleted. + +## `POST /api/conversations` + +Creates a conversation. + +### Request + +```json +{ + "name": "a unique conversation name" +} +``` + +The request must have the following fields: + +| Field | Type | Description | +| :----- | :----- | :----------------------- | +| `name` | string | The conversation's name. | + +The proposed `name` must be valid. The precise definition of valid is still up in the air, but, at minimum: + +- It must be non-empty. +- It must not be "too long." (Currently, 64 characters is too long.) +- It must begin with a printing character. +- It must end with a printing character. +- It must not contain runs of multiple whitespace characters. + +### Success + +This endpoint will respond with a status of `202 Accepted` when successful. The body of the response will be a JSON object describing the new conversation: + +```json +{ + "id": "C9876cyyz", + "name": "a unique conversation name" +} +``` + +The response will have the following fields: + +| Field | Type | Description | +| :----- | :----- | :----------------------------------------------------------------------------------------------------------------------------------------------------- | +| `id` | string | A unique identifier for the conversation. This can be used to associate the conversation with events, or to make API calls targeting the conversation. | +| `name` | string | The conversation's normalized name. | + +The returned name may not be identical to the name requested, as the name will be converted to [normalization form C](http://www.unicode.org/reports/tr15/) automatically. The returned name will include this normalization; the service will use the normalized name elsewhere, and does not store the originally requested name. + +When completed, the service will emit a [conversation created](events.md#conversation-created) event with the conversation's ID. + +### Name not valid + +This endpoint will respond with a status of `400 Bad Request` if the proposed `name` is not valid. + +### Conversation name in use + +This endpoint will respond with a status of `409 Conflict` if a conversation with the requested name already exists. + +## `POST /api/conversations/:id` + +Sends a message to a conversation. + +This endpoint requires the following path parameter: + +| Parameter | Type | Description | +| :-------- | :----- | :----------------- | +| `id` | string | A conversation ID. | + +### Request + +```json +{ + "body": "my amazing thoughts, by bob" +} +``` + +The request must have the following fields: + +| Field | Type | Description | +| :----- | :----- | :------------------------------------------ | +| `body` | string | The message to deliver to the conversation. | + +### Success + +This endpoint will respond with a status of `202 Accepted` when successful. The body of the response will be a JSON object describing the newly-sent message: + +```json +{ + "at": "2024-10-19T04:37:09.467325Z", + "conversation": "Cfqdn1234", + "sender": "Uabcd1234", + "id": "Mgh98yp75", + "body": "my amazing thoughts, by bob" +} +``` + +The response will have the following fields: + +| Field | Type | Description | +| :------------- | :-------- | :-------------------------------------------------------------------------------------------------------------------------------------- | +| `at` | timestamp | The moment the message was sent. | +| `conversation` | string | The ID of the conversation the message was sent to. | +| `sender` | string | The ID of the user that sent the message. | +| `id` | string | A unique identifier for the message. This can be used to associate the message with events, or to make API calls targeting the message. | +| `body` | string | The message's normalized body. | + +The returned message body may not be identical to the body as sent, as the body will be converted to [normalization form C](http://www.unicode.org/reports/tr15/) automatically. The returned body will include this normalization; the service will use the normalized body elsewhere, and does not store the originally submitted body. + +When completed, the service will emit a [message sent](events.md#message-sent) event with the message's ID. + +### Invalid conversation ID + +This endpoint will respond with a status of `404 Not Found` if the conversation ID is not valid. + +## `DELETE /api/conversations/:id` + +Deletes a conversation. + +Deleting a conversation prevents it from receiving any further messages. The conversation must be empty; to delete a conversation with messages in it, delete the messages first (or wait for them to expire). + +This endpoint requires the following path parameter: + +| Parameter | Type | Description | +| :-------- | :----- | :----------------- | +| `id` | string | A conversation ID. | + +### Success + +This endpoint will respond with a status of `202 Accepted` when successful. The body of the response will be a JSON object describing the deleted conversation: + +```json +{ + "id": "Cfqdn1234" +} +``` + +The response will have the following fields: + +| Field | Type | Description | +| :---- | :----- | :--------------------- | +| `id` | string | The conversation's ID. | + +When completed, the service will emit a [message deleted](events.md#message-deleted) event for each message in the conversation, followed by a [conversation deleted](events.md#conversation-deleted) event with the conversation's ID. + +### Conversation not empty + +This endpoint will respond with a status of `409 Conflict` if the conversation contains messages. + +### Invalid conversation ID + +This endpoint will respond with a status of `404 Not Found` if the conversation ID is not valid. + +## `DELETE /api/messages/:id` + +Deletes a message. + +This endpoint requires the following path parameter: + +| Parameter | Type | Description | +| :-------- | :----- | :------------ | +| `id` | string | A message ID. | + +### Success + +This endpoint will respond with a status of `202 Accepted` when successful. The body of the response will be a JSON object describing the deleted message: + +```json +{ + "id": "Mgh98yp75" +} +``` + +The response will have the following fields: + +| Field | Type | Description | +| :---- | :----- | :---------------- | +| `id` | string | The message's ID. | + +When completed, the service will emit a [message deleted](events.md#message-deleted) event with the message's ID. + +### Invalid message ID + +This endpoint will respond with a status of `404 Not Found` if the message ID is not valid. + +### Not the sender + +This endpoint will respond with a status of `403 Forbidden` if the message was sent by a different login. diff --git a/docs/api/events.md b/docs/api/events.md index 570dffa..e692f82 100644 --- a/docs/api/events.md +++ b/docs/api/events.md @@ -28,13 +28,11 @@ sequenceDiagram end ``` -The core of the service is to facilitate conversations between users. Conversational activity is delivered to clients using -_events_. Each event notifies interested clients of activity sent to the service through its API. +The core of the service is to facilitate conversations between users. Conversational activity is delivered to clients using _events_. Each event notifies interested clients of activity sent to the service through its API. ## Asynchronous completion -A number of endpoints return -`202 Accepted` responses. The actions performed by those endpoints will be completed before events are delivered. To await the completion of an operation which returns this response, clients must monitor the event stream for the corresponding event. +A number of endpoints return `202 Accepted` responses. The actions performed by those endpoints will be completed before events are delivered. To await the completion of an operation which returns this response, clients must monitor the event stream for the corresponding event. ## `GET /api/events` @@ -46,14 +44,11 @@ This endpoint is designed for use with the [EventSource] DOM API, and supports s ### Query parameters -This endpoint requires a -`resume_point` (integer) query parameter. The event stream will collect events published after that point in time. The value must be obtained by calling the [ -`GET /api/boot`](./boot.md) method. +This endpoint requires a `resume_point` (integer) query parameter. The event stream will collect events published after that point in time. The value must be obtained by calling the [`GET /api/boot`](./boot.md) method. ### Request headers -This endpoint accepts an optional `last-event-id` (string) header. If present, the value must be the value of the -`id` field of the last message processed by the client. The returned event stream will begin with the following message. If absent, the returned event stream will begin from the start of the event collection. +This endpoint accepts an optional `last-event-id` (string) header. If present, the value must be the value of the `id` field of the last message processed by the client. The returned event stream will begin with the following message. If absent, the returned event stream will begin from the start of the event collection. This header is set automatically by `EventSource` when reconnecting to an event stream. @@ -69,7 +64,7 @@ data: { data: "type": "message", data: "event": "sent", data: "at": "2024-09-27T23:19:10.208147Z", -data: "channel": "C9876cyyz", +data: "conversation": "C9876cyyz", data: "sender": "U1234abcd", data: "id": "M1312acab", data: "body": "beep" @@ -78,9 +73,7 @@ data: } The service will keep the connection open, and will deliver events as they occur. -The service may terminate the connection at any time. Clients should reconnect and resume the stream, using the -`last-event-id` header to resume from the last message received. The -`id` of each event is an ephemeral ID, useful only for this purpose. +The service may terminate the connection at any time. Clients should reconnect and resume the stream, using the `last-event-id` header to resume from the last message received. The `id` of each event is an ephemeral ID, useful only for this purpose. Each event's `data` consists of a JSON object describing one event. Every event includes the following fields: @@ -133,66 +126,63 @@ These events have the `event` field set to `"created"`. They include the followi | `id` | string | A unique identifier for the newly-created user. This can be used to associate the user with other events, or to make API calls targeting the user. | | `name` | string | The user's name. | -## Channel events +## Conversation events -The following events describe changes to [channels](./channels-messages.md). +The following events describe changes to [conversations](conversations-messages.md). -These events have the `type` field set to `"channel"`. +These events have the `type` field set to `"conversation"`. -### Channel created +### Conversation created ```json { - "type": "channel", + "type": "conversation", "event": "created", "at": "2024-09-27T23:18:10.208147Z", "id": "C9876cyyz", - "name": "example channel 2" + "name": "example conversation 2" } ``` -Sent whenever a new channel is created. +Sent whenever a new conversation is created. These events have the `event` field set to `"created"`. They include the following additional fields: -| Field | Type | Description | -| :----------- | :------------------ | :---------------------------------------------------------------------------------------------------------------------------------------------------------- | -| `at` | timestamp | The moment the channel was created. | -| `id` | string | A unique identifier for the newly-created channel. This can be used to associate the channel with other events, or to make API calls targeting the channel. | -| `name` | string | The channel's name. | -| `deleted_at` | timestamp, optional | If set, the moment the channel was deleted. | +| Field | Type | Description | +| :----------- | :------------------ | :------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| `at` | timestamp | The moment the conversation was created. | +| `id` | string | A unique identifier for the newly-created conversation. This can be used to associate the conversation with other events, or to make API calls targeting the conversation. | +| `name` | string | The conversation's name. | +| `deleted_at` | timestamp, optional | If set, the moment the conversation was deleted. | -When a channel is deleted or expires, the `"created"` event is replaced with a tombstone -`"created"` event, so that the original channel cannot be trivially recovered from the event stream. Tombstone events have a -`deleted_at` field, and a `name` of `""`. Tombstone events for channels use an empty string as the name, and not -`null` or with the `name` field removed entirely, to simplify client development. While clients -_should_ treat deleted channels specially, for example by rendering them as "channel deleted" markers, they don't have to be - they make sense if interpreted as channels with empty names, too. +When a conversation is deleted or expires, the `"created"` event is replaced with a tombstone +`"created"` event, so that the original conversation cannot be trivially recovered from the event stream. Tombstone events have a `deleted_at` field, and a `name` of `""`. Tombstone events for conversation use an empty string as the name, and not `null` or with the `name` field removed entirely, to simplify client development. While clients _should_ treat deleted conversation specially, for example by rendering them as "conversation deleted" markers, they don't have to be - they make sense if interpreted as conversation with empty names, too. -Once a deleted channel is [purged](./channels-messages.md#expiry-and-purging), these tombstone events are removed from the event stream. +Once a deleted conversation is [purged](conversations-messages.md#expiry-and-purging), these tombstone events are removed from the event stream. -### Channel deleted +### Conversation deleted ```json { - "type": "channel", + "type": "conversation", "event": "deleted", "at": "2024-09-28T03:40:25.384318Z", "id": "C9876cyyz" } ``` -Sent whenever a channel is deleted or expires. +Sent whenever a conversation is deleted or expires. These events have the `event` field set to `"deleted"`. They include the following additional fields: -| Field | Type | Description | -| :---- | :-------- | :---------------------------------- | -| `at` | timestamp | The moment the channel was deleted. | -| `id` | string | The deleted channel's ID. | +| Field | Type | Description | +| :---- | :-------- | :--------------------------------------- | +| `at` | timestamp | The moment the conversation was deleted. | +| `id` | string | The deleted conversation's ID. | ## Message events -The following events describe changes to [messages](./channels-messages.md). +The following events describe changes to [messages](conversations-messages.md). These events have the `type` field set to `"message"`. @@ -203,7 +193,7 @@ These events have the `type` field set to `"message"`. "type": "message", "event": "sent", "at": "2024-09-27T23:19:10.208147Z", - "channel": "C9876cyyz", + "conversation": "C9876cyyz", "sender": "U1234abcd", "id": "M1312acab", "body": "an effusive blob of prose, condensed down to a single string" @@ -214,22 +204,19 @@ Sent whenever a message is sent by a client. These events have the `event` field set to `"sent"`. They include the following additional fields: -| Field | Type | Description | -| :----------- | :------------------ | :-------------------------------------------------------------------------------------------------------------------------------------------- | -| `at` | timestamp | The moment the message was sent. | -| `channel` | string | The ID of the channel the message was sent to. | -| `sender` | string | The ID of the user that sent the message. | -| `id` | string | A unique identifier for the message. This can be used to associate the message with other events, or to make API calls targeting the message. | -| `body` | string | The text of the message. | -| `deleted_at` | timestamp, optional | If set, the moment the message was deleted. | - -When a message is deleted or expires, the `"sent"` event is replaced with a tombstone -`"sent"` event, so that the original message cannot be trivially recovered from the event stream. Tombstone events have a -`deleted_at` field, and a `body` of `""`. Tombstone events for messages use an empty string as the `body`, and not -`null` or with the `body` field removed entirely, to simplify client development. While clients +| Field | Type | Description | +| :------------- | :------------------ | :-------------------------------------------------------------------------------------------------------------------------------------------- | +| `at` | timestamp | The moment the message was sent. | +| `conversation` | string | The ID of the conversation the message was sent to. | +| `sender` | string | The ID of the user that sent the message. | +| `id` | string | A unique identifier for the message. This can be used to associate the message with other events, or to make API calls targeting the message. | +| `body` | string | The text of the message. | +| `deleted_at` | timestamp, optional | If set, the moment the message was deleted. | + +When a message is deleted or expires, the `"sent"` event is replaced with a tombstone `"sent"` event, so that the original message cannot be trivially recovered from the event stream. Tombstone events have a `deleted_at` field, and a `body` of `""`. Tombstone events for messages use an empty string as the `body`, and not `null` or with the `body` field removed entirely, to simplify client development. While clients _should_ treat deleted messages specially, for example by rendering them as "message deleted" markers, they don't have to be - they make sense if interpreted as messages with empty bodies, too. -Once a deleted message is [purged](./channels-messages.md#expiry-and-purging), these tombstone events are removed from the event stream. +Once a deleted message is [purged](conversations-messages.md#expiry-and-purging), these tombstone events are removed from the event stream. ### Message deleted diff --git a/docs/developer/server/code-organization.md b/docs/developer/server/code-organization.md index d17b604..3a691a2 100644 --- a/docs/developer/server/code-organization.md +++ b/docs/developer/server/code-organization.md @@ -6,7 +6,7 @@ Trust your gut, and reorganize to meet new needs. We've already revised this sch ## Topic modules -High-level concerns are grouped into topical modules. These include `crate::channel`, `crate::events`, `crate::login`, and others. Those modules generally contain their own app types, their own repo types, their own extractors, and any other supporting code they need. They may also provide an interface to other modules in the program. +High-level concerns are grouped into topical modules. These include `crate::conversation`, `crate::events`, `crate::login`, and others. Those modules generally contain their own app types, their own repo types, their own extractors, and any other supporting code they need. They may also provide an interface to other modules in the program. Most topic modules contain one or more of: diff --git a/docs/developer/server/testing.md b/docs/developer/server/testing.md index 8e87568..a3109cb 100644 --- a/docs/developer/server/testing.md +++ b/docs/developer/server/testing.md @@ -22,6 +22,6 @@ Prefer writing "flat" fixtures that do one thing, over compound fixtures that do Prefer role-specific names for test values: use, for example, `sender` for a login related to sending messages, rather than `login`. Fixture data is cheap, so make as many entities as make sense for the test. They'll vanish at the end of the test anyways. -Prefer testing a single endpoint at a time. Other interactions, which may be needed to set up the scenario or verify the results, should be done against the `app` abstraction directly. It's okay if this leads to redundant tests (see for example `src/channel/routes/test/on_send.rs` and `src/events/routes/test.rs`, which overlap heavily). +Prefer testing a single endpoint at a time. Other interactions, which may be needed to set up the scenario or verify the results, should be done against the `app` abstraction directly. It's okay if this leads to redundant tests (see for example `src/conversation/routes/test/on_send.rs` and `src/events/routes/test.rs`, which overlap heavily). Panicking in tests is fine. Panic messages should describe why the preconditions were expected, and can be terse. diff --git a/src/event/mod.rs b/src/event/mod.rs index 801bcb9..f41dc9c 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -20,7 +20,7 @@ pub use self::{ #[serde(tag = "type", rename_all = "snake_case")] pub enum Event { User(user::Event), - Channel(conversation::Event), + Conversation(conversation::Event), Message(message::Event), } @@ -38,7 +38,7 @@ impl Sequenced for Event { fn instant(&self) -> Instant { match self { Self::User(event) => event.instant(), - Self::Channel(event) => event.instant(), + Self::Conversation(event) => event.instant(), Self::Message(event) => event.instant(), } } @@ -52,7 +52,7 @@ impl From for Event { impl From for Event { fn from(event: conversation::Event) -> Self { - Self::Channel(event) + Self::Conversation(event) } } diff --git a/src/message/repo.rs b/src/message/repo.rs index 68f6e4a..9b65a67 100644 --- a/src/message/repo.rs +++ b/src/message/repo.rs @@ -54,7 +54,7 @@ impl Messages<'_> { .map(|row| History { message: Message { sent: Instant::new(row.sent_at, row.sent_sequence), - channel: row.conversation, + conversation: row.conversation, sender: row.sender, id: row.id, body: row.body.unwrap_or_default(), @@ -95,7 +95,7 @@ impl Messages<'_> { .map(|row| History { message: Message { sent: Instant::new(row.sent_at, row.sent_sequence), - channel: row.conversation, + conversation: row.conversation, sender: row.sender, id: row.id, body: row.body.unwrap_or_default(), @@ -132,7 +132,7 @@ impl Messages<'_> { .map(|row| History { message: Message { sent: Instant::new(row.sent_at, row.sent_sequence), - channel: row.conversation, + conversation: row.conversation, sender: row.sender, id: row.id, body: row.body.unwrap_or_default(), @@ -168,7 +168,7 @@ impl Messages<'_> { .map(|row| History { message: Message { sent: Instant::new(row.sent_at, row.sent_sequence), - channel: row.conversation, + conversation: row.conversation, sender: row.sender, id: row.id, body: row.body.unwrap_or_default(), @@ -274,7 +274,7 @@ impl Messages<'_> { message: Message { sent: Instant::new(row.sent_at, row.sent_sequence), id: row.id, - channel: row.conversation, + conversation: row.conversation, sender: row.sender, body: row.body.unwrap_or_default(), deleted_at: row.deleted_at, @@ -309,7 +309,7 @@ impl Messages<'_> { .map(|row| History { message: Message { sent: Instant::new(row.sent_at, row.sent_sequence), - channel: row.conversation, + conversation: row.conversation, sender: row.sender, id: row.id, body: row.body.unwrap_or_default(), diff --git a/src/message/snapshot.rs b/src/message/snapshot.rs index 0e6e9ae..12d4daa 100644 --- a/src/message/snapshot.rs +++ b/src/message/snapshot.rs @@ -8,7 +8,7 @@ use crate::{clock::DateTime, conversation, event::Instant, user}; pub struct Message { #[serde(flatten)] pub sent: Instant, - pub channel: conversation::Id, + pub conversation: conversation::Id, pub sender: user::Id, pub id: Id, pub body: Body, diff --git a/src/routes.rs b/src/routes.rs index ca4c60c..e38f744 100644 --- a/src/routes.rs +++ b/src/routes.rs @@ -29,13 +29,13 @@ pub fn routes(app: &App) -> Router { .route("/api/auth/login", post(user::handlers::login)) .route("/api/auth/logout", post(user::handlers::logout)) .route("/api/boot", get(boot::handlers::boot)) - .route("/api/channels", post(conversation::handlers::create)) + .route("/api/conversations", post(conversation::handlers::create)) .route( - "/api/channels/{channel}", + "/api/conversations/{conversation}", post(conversation::handlers::send), ) .route( - "/api/channels/{channel}", + "/api/conversations/{conversation}", delete(conversation::handlers::delete), ) .route("/api/events", get(event::handlers::stream)) diff --git a/src/test/fixtures/event/mod.rs b/src/test/fixtures/event/mod.rs index 69c79d8..08b17e7 100644 --- a/src/test/fixtures/event/mod.rs +++ b/src/test/fixtures/event/mod.rs @@ -4,7 +4,7 @@ pub mod stream; pub fn conversation(event: Event) -> Option { match event { - Event::Channel(channel) => Some(channel), + Event::Conversation(conversation) => Some(conversation), _ => None, } } diff --git a/ui/lib/apiServer.js b/ui/lib/apiServer.js index 397638c..1bca6f6 100644 --- a/ui/lib/apiServer.js +++ b/ui/lib/apiServer.js @@ -28,11 +28,11 @@ export async function changePassword(password, to) { } export async function createChannel(name) { - return await apiServer.post('/channels', { name }).catch(responseError); + return await apiServer.post('/conversations', { name }).catch(responseError); } export async function postToChannel(channelId, body) { - return await apiServer.post(`/channels/${channelId}`, { body }).catch(responseError); + return await apiServer.post(`/conversations/${channelId}`, { body }).catch(responseError); } export async function deleteMessage(messageId) { diff --git a/ui/lib/session.svelte.js b/ui/lib/session.svelte.js index 838401c..0c73e00 100644 --- a/ui/lib/session.svelte.js +++ b/ui/lib/session.svelte.js @@ -11,7 +11,7 @@ import { DateTime } from 'luxon'; class Channel { static fromRemote({ at, id, name }, messages, meta) { const sentAt = messages - .filter((message) => message.channel === id) + .filter((message) => message.conversation === id) .map((message) => message.at); const lastEventAt = DateTime.max(at, ...sentAt); const lastReadAt = meta.get(id)?.lastReadAt; @@ -29,21 +29,21 @@ class Channel { } class Message { - static fromRemote({ id, at, channel, sender, body, renderedBody }, users) { + static fromRemote({ id, at, conversation, sender, body, renderedBody }, users) { return new Message({ id, at, - channel, + conversation, sender: users.get(sender), body, renderedBody, }); } - constructor({ id, at, channel, sender, body, renderedBody }) { + constructor({ id, at, conversation, sender, body, renderedBody }) { this.id = id; this.at = at; - this.channel = channel; + this.conversation = conversation; this.sender = sender; this.body = body; this.renderedBody = renderedBody; diff --git a/ui/lib/state/remote/messages.svelte.js b/ui/lib/state/remote/messages.svelte.js index 1be001b..852f29e 100644 --- a/ui/lib/state/remote/messages.svelte.js +++ b/ui/lib/state/remote/messages.svelte.js @@ -2,21 +2,21 @@ import { DateTime } from 'luxon'; import { render } from '$lib/markdown.js'; class Message { - static boot({ id, at, channel, sender, body }) { + static boot({ id, at, conversation, sender, body }) { return new Message({ id, at: DateTime.fromISO(at), - channel, + conversation, sender, body, renderedBody: render(body), }); } - constructor({ id, at, channel, sender, body, renderedBody }) { + constructor({ id, at, conversation, sender, body, renderedBody }) { this.id = id; this.at = at; - this.channel = channel; + this.conversation = conversation; this.sender = sender; this.body = body; this.renderedBody = renderedBody; @@ -26,8 +26,8 @@ class Message { export class Messages { all = $state([]); - add({ id, at, channel, sender, body }) { - const message = Message.boot({ id, at, channel, sender, body }); + add({ id, at, conversation, sender, body }) { + const message = Message.boot({ id, at, conversation, sender, body }); this.all.push(message); } diff --git a/ui/lib/state/remote/state.svelte.js b/ui/lib/state/remote/state.svelte.js index fb46489..ffc88c6 100644 --- a/ui/lib/state/remote/state.svelte.js +++ b/ui/lib/state/remote/state.svelte.js @@ -30,8 +30,8 @@ export class State { // Heartbeats are actually completely ignored here. They're handled in `Session`, but not as a // special case; _any_ event is a heartbeat event. switch (event.type) { - case 'channel': - return this.onChannelEvent(event); + case 'conversation': + return this.onConversationEvent(event); case 'user': return this.onUserEvent(event); case 'message': @@ -39,21 +39,21 @@ export class State { } } - onChannelEvent(event) { + onConversationEvent(event) { switch (event.event) { case 'created': - return this.onChannelCreated(event); + return this.onConversationCreated(event); case 'deleted': - return this.onChannelDeleted(event); + return this.onConversationDeleted(event); } } - onChannelCreated(event) { + onConversationCreated(event) { const { id, name } = event; this.channels.add({ id, name }); } - onChannelDeleted(event) { + onConversationDeleted(event) { const { id } = event; this.channels.remove(id); } @@ -80,8 +80,8 @@ export class State { } onMessageSent(event) { - const { id, at, channel, sender, body } = event; - this.messages.add({ id, at, channel, sender, body }); + const { id, at, conversation, sender, body } = event; + this.messages.add({ id, at, conversation, sender, body }); } onMessageDeleted(event) { diff --git a/ui/routes/(app)/ch/[channel]/+page.svelte b/ui/routes/(app)/ch/[channel]/+page.svelte index 87918f7..ef000fc 100644 --- a/ui/routes/(app)/ch/[channel]/+page.svelte +++ b/ui/routes/(app)/ch/[channel]/+page.svelte @@ -12,7 +12,9 @@ const channelId = $derived(page.params.channel); const channel = $derived(session.channels.find((channel) => channel.id === channelId)); - const messages = $derived(session.messages.filter((message) => message.channel === channelId)); + const messages = $derived( + session.messages.filter((message) => message.conversation === channelId), + ); const unsent = $derived(outbox.messages.filter((message) => message.channel === channelId)); const deleted = $derived(outbox.deleted.map((message) => message.messageId)); const unsentSkeletons = $derived( -- cgit v1.2.3 From 1cafeb5ec92c1dc4ad74fbed58b15a8ab2f3c0cf Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Tue, 1 Jul 2025 15:30:44 -0400 Subject: Move the `/ch` channel view to `/c` (for conversation). --- src/routes.rs | 2 +- ui/lib/components/Channel.svelte | 2 +- ui/routes/(app)/+layout.svelte | 4 +- ui/routes/(app)/c/[conversation]/+page.svelte | 117 ++++++++++++++++++++++++++ ui/routes/(app)/ch/[channel]/+page.svelte | 117 -------------------------- 5 files changed, 121 insertions(+), 121 deletions(-) create mode 100644 ui/routes/(app)/c/[conversation]/+page.svelte delete mode 100644 ui/routes/(app)/ch/[channel]/+page.svelte (limited to 'src') diff --git a/src/routes.rs b/src/routes.rs index e38f744..49d9fb6 100644 --- a/src/routes.rs +++ b/src/routes.rs @@ -15,7 +15,7 @@ pub fn routes(app: &App) -> Router { // UI routes that require the administrator to complete setup first. let ui_setup_required = Router::new() .route("/", get(ui::handlers::index)) - .route("/ch/{channel}", get(ui::handlers::conversation)) + .route("/c/{conversation}", get(ui::handlers::conversation)) .route("/invite/{invite}", get(ui::handlers::invite)) .route("/login", get(ui::handlers::login)) .route("/me", get(ui::handlers::me)) diff --git a/ui/lib/components/Channel.svelte b/ui/lib/components/Channel.svelte index 4f908d2..9004e50 100644 --- a/ui/lib/components/Channel.svelte +++ b/ui/lib/components/Channel.svelte @@ -3,7 +3,7 @@
  • - + {#if hasUnreads} {:else} diff --git a/ui/routes/(app)/+layout.svelte b/ui/routes/(app)/+layout.svelte index c7e1f22..e3272bc 100644 --- a/ui/routes/(app)/+layout.svelte +++ b/ui/routes/(app)/+layout.svelte @@ -20,7 +20,7 @@ onDestroy(session.end.bind(session)); let pageContext = getContext('page'); - let channel = $derived(page.params.channel); + let channel = $derived(page.params.conversation); let channels = $derived(session.channels); @@ -60,7 +60,7 @@ const lastActiveChannel = getLastActiveChannel(); const inRoot = page.url.pathname === '/'; if (inRoot && lastActiveChannel) { - goto(`/ch/${lastActiveChannel}`); + goto(`/c/${lastActiveChannel}`); } else if (channel) { setLastActiveChannel(channel || null); } diff --git a/ui/routes/(app)/c/[conversation]/+page.svelte b/ui/routes/(app)/c/[conversation]/+page.svelte new file mode 100644 index 0000000..4d2cc86 --- /dev/null +++ b/ui/routes/(app)/c/[conversation]/+page.svelte @@ -0,0 +1,117 @@ + + + + +
    + {#each messageRuns as { sender, ownMessage, messages }} + + {#each messages as message} + + {/each} + + {/each} +
    +
    + +
    diff --git a/ui/routes/(app)/ch/[channel]/+page.svelte b/ui/routes/(app)/ch/[channel]/+page.svelte deleted file mode 100644 index ef000fc..0000000 --- a/ui/routes/(app)/ch/[channel]/+page.svelte +++ /dev/null @@ -1,117 +0,0 @@ - - - - -
    - {#each messageRuns as { sender, ownMessage, messages }} - - {#each messages as message} - - {/each} - - {/each} -
    -
    - -
    -- cgit v1.2.3