From a48c19f8c933291a3e65a3143eabda03e8bf1ede Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Sun, 15 Sep 2024 22:39:23 -0400 Subject: Annotate channel events with channel ID at the router, not intrinsically. This bugged me aesthetically. At `app.channel().events(channel)`, the caller knows the channel ID; they don't need to be told. Having the same info come back out in the returned events felt bad. --- ...ea28a7421b39d697ae2211cfebe8caaf12072540c1.json | 38 ++++++++++++++++ ...99a6eb6396bbd068a4b52f7b2cb47c7ec3b3e55130.json | 44 +++++++++++++++++++ ...205a7b6d6aecbc52c56a7a8ad38f96cb82ebcbd53e.json | 44 ------------------- ...3016ce5938e2aa3955e1391c69f70d40d50cafec80.json | 50 ---------------------- src/channel/repo/messages.rs | 5 --- src/events.rs | 29 ++++++++++--- 6 files changed, 106 insertions(+), 104 deletions(-) create mode 100644 .sqlx/query-40759cdaeb1dddcda384b8ea28a7421b39d697ae2211cfebe8caaf12072540c1.json create mode 100644 .sqlx/query-64276398f24ab7f7d2783199a6eb6396bbd068a4b52f7b2cb47c7ec3b3e55130.json delete mode 100644 .sqlx/query-9d7f46ceefa71aad1aec42205a7b6d6aecbc52c56a7a8ad38f96cb82ebcbd53e.json delete mode 100644 .sqlx/query-9dbb15da4d7cc22bd2f2623016ce5938e2aa3955e1391c69f70d40d50cafec80.json diff --git a/.sqlx/query-40759cdaeb1dddcda384b8ea28a7421b39d697ae2211cfebe8caaf12072540c1.json b/.sqlx/query-40759cdaeb1dddcda384b8ea28a7421b39d697ae2211cfebe8caaf12072540c1.json new file mode 100644 index 0000000..4118fe3 --- /dev/null +++ b/.sqlx/query-40759cdaeb1dddcda384b8ea28a7421b39d697ae2211cfebe8caaf12072540c1.json @@ -0,0 +1,38 @@ +{ + "db_name": "SQLite", + "query": "\n insert into message\n (id, sender, channel, body, sent_at)\n values ($1, $2, $3, $4, $5)\n returning\n id as \"id: Id\",\n sender as \"sender: LoginId\",\n body,\n sent_at as \"sent_at: DateTime\"\n ", + "describe": { + "columns": [ + { + "name": "id: Id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "sender: LoginId", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "body", + "ordinal": 2, + "type_info": "Text" + }, + { + "name": "sent_at: DateTime", + "ordinal": 3, + "type_info": "Text" + } + ], + "parameters": { + "Right": 5 + }, + "nullable": [ + false, + false, + false, + false + ] + }, + "hash": "40759cdaeb1dddcda384b8ea28a7421b39d697ae2211cfebe8caaf12072540c1" +} diff --git a/.sqlx/query-64276398f24ab7f7d2783199a6eb6396bbd068a4b52f7b2cb47c7ec3b3e55130.json b/.sqlx/query-64276398f24ab7f7d2783199a6eb6396bbd068a4b52f7b2cb47c7ec3b3e55130.json new file mode 100644 index 0000000..b94bb4b --- /dev/null +++ b/.sqlx/query-64276398f24ab7f7d2783199a6eb6396bbd068a4b52f7b2cb47c7ec3b3e55130.json @@ -0,0 +1,44 @@ +{ + "db_name": "SQLite", + "query": "\n select\n message.id as \"id: Id\",\n login.id as \"sender_id: LoginId\",\n login.name as sender_name,\n message.body,\n message.sent_at as \"sent_at: DateTime\"\n from message\n join login on message.sender = login.id\n where channel = $1\n and coalesce(sent_at > $2, true)\n order by sent_at asc\n ", + "describe": { + "columns": [ + { + "name": "id: Id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "sender_id: LoginId", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "sender_name", + "ordinal": 2, + "type_info": "Text" + }, + { + "name": "body", + "ordinal": 3, + "type_info": "Text" + }, + { + "name": "sent_at: DateTime", + "ordinal": 4, + "type_info": "Text" + } + ], + "parameters": { + "Right": 2 + }, + "nullable": [ + false, + false, + false, + false, + false + ] + }, + "hash": "64276398f24ab7f7d2783199a6eb6396bbd068a4b52f7b2cb47c7ec3b3e55130" +} diff --git a/.sqlx/query-9d7f46ceefa71aad1aec42205a7b6d6aecbc52c56a7a8ad38f96cb82ebcbd53e.json b/.sqlx/query-9d7f46ceefa71aad1aec42205a7b6d6aecbc52c56a7a8ad38f96cb82ebcbd53e.json deleted file mode 100644 index 93bbe5e..0000000 --- a/.sqlx/query-9d7f46ceefa71aad1aec42205a7b6d6aecbc52c56a7a8ad38f96cb82ebcbd53e.json +++ /dev/null @@ -1,44 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n insert into message\n (id, sender, channel, body, sent_at)\n values ($1, $2, $3, $4, $5)\n returning\n id as \"id: Id\",\n sender as \"sender: LoginId\",\n channel as \"channel: ChannelId\",\n body,\n sent_at as \"sent_at: DateTime\"\n ", - "describe": { - "columns": [ - { - "name": "id: Id", - "ordinal": 0, - "type_info": "Text" - }, - { - "name": "sender: LoginId", - "ordinal": 1, - "type_info": "Text" - }, - { - "name": "channel: ChannelId", - "ordinal": 2, - "type_info": "Text" - }, - { - "name": "body", - "ordinal": 3, - "type_info": "Text" - }, - { - "name": "sent_at: DateTime", - "ordinal": 4, - "type_info": "Text" - } - ], - "parameters": { - "Right": 5 - }, - "nullable": [ - false, - false, - false, - false, - false - ] - }, - "hash": "9d7f46ceefa71aad1aec42205a7b6d6aecbc52c56a7a8ad38f96cb82ebcbd53e" -} diff --git a/.sqlx/query-9dbb15da4d7cc22bd2f2623016ce5938e2aa3955e1391c69f70d40d50cafec80.json b/.sqlx/query-9dbb15da4d7cc22bd2f2623016ce5938e2aa3955e1391c69f70d40d50cafec80.json deleted file mode 100644 index f2ba465..0000000 --- a/.sqlx/query-9dbb15da4d7cc22bd2f2623016ce5938e2aa3955e1391c69f70d40d50cafec80.json +++ /dev/null @@ -1,50 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n select\n message.id as \"id: Id\",\n login.id as \"sender_id: LoginId\",\n login.name as sender_name,\n message.channel as \"channel: ChannelId\",\n message.body,\n message.sent_at as \"sent_at: DateTime\"\n from message\n join login on message.sender = login.id\n where channel = $1\n and coalesce(sent_at > $2, true)\n order by sent_at asc\n ", - "describe": { - "columns": [ - { - "name": "id: Id", - "ordinal": 0, - "type_info": "Text" - }, - { - "name": "sender_id: LoginId", - "ordinal": 1, - "type_info": "Text" - }, - { - "name": "sender_name", - "ordinal": 2, - "type_info": "Text" - }, - { - "name": "channel: ChannelId", - "ordinal": 3, - "type_info": "Text" - }, - { - "name": "body", - "ordinal": 4, - "type_info": "Text" - }, - { - "name": "sent_at: DateTime", - "ordinal": 5, - "type_info": "Text" - } - ], - "parameters": { - "Right": 2 - }, - "nullable": [ - false, - false, - false, - false, - false, - false - ] - }, - "hash": "9dbb15da4d7cc22bd2f2623016ce5938e2aa3955e1391c69f70d40d50cafec80" -} diff --git a/src/channel/repo/messages.rs b/src/channel/repo/messages.rs index e15a02a..2c89724 100644 --- a/src/channel/repo/messages.rs +++ b/src/channel/repo/messages.rs @@ -26,7 +26,6 @@ pub struct Messages<'t>(&'t mut SqliteConnection); pub struct BroadcastMessage { pub id: Id, pub sender: Login, - pub channel: ChannelId, pub body: String, pub sent_at: DateTime, } @@ -51,7 +50,6 @@ impl<'c> Messages<'c> { returning id as "id: Id", sender as "sender: LoginId", - channel as "channel: ChannelId", body, sent_at as "sent_at: DateTime" "#, @@ -66,7 +64,6 @@ impl<'c> Messages<'c> { BroadcastMessage { id: row.id, sender: sender.clone(), - channel: row.channel, body: row.body, sent_at: row.sent_at, } @@ -88,7 +85,6 @@ impl<'c> Messages<'c> { message.id as "id: Id", login.id as "sender_id: LoginId", login.name as sender_name, - message.channel as "channel: ChannelId", message.body, message.sent_at as "sent_at: DateTime" from message @@ -106,7 +102,6 @@ impl<'c> Messages<'c> { id: row.sender_id, name: row.sender_name, }, - channel: row.channel, body: row.body, sent_at: row.sent_at, }) diff --git a/src/events.rs b/src/events.rs index 2d1e1f8..51f06b4 100644 --- a/src/events.rs +++ b/src/events.rs @@ -49,21 +49,33 @@ async fn on_events( let streams = stream::iter(query.channels) .then(|channel| { let app = app.clone(); - async move { app.channels().events(&channel, resume_at.as_ref()).await } + async move { + let events = app + .channels() + .events(&channel, resume_at.as_ref()) + .await? + .map_ok(move |message| ChannelEvent { + channel: channel.clone(), + message, + }); + + Ok::<_, BoxedError>(events) + } }) .try_collect::>() .await?; - let stream = stream::select_all(streams).and_then(|msg| future::ready(to_event(msg))); + let stream = stream::select_all(streams).and_then(|msg| future::ready(to_sse_event(msg))); let sse = Sse::new(stream).keep_alive(sse::KeepAlive::default()); Ok(sse) } -fn to_event(msg: BroadcastMessage) -> Result { - let data = serde_json::to_string(&msg)?; +fn to_sse_event(event: ChannelEvent) -> Result { + let data = serde_json::to_string(&event)?; let event = sse::Event::default() - .id(msg + .id(event + .message .sent_at .to_rfc3339_opts(SecondsFormat::AutoSi, /* use_z */ true)) .data(&data); @@ -71,6 +83,13 @@ fn to_event(msg: BroadcastMessage) -> Result { Ok(event) } +#[derive(serde::Serialize)] +struct ChannelEvent { + channel: ChannelId, + #[serde(flatten)] + message: M, +} + pub struct LastEventId(pub String); static LAST_EVENT_ID: HeaderName = HeaderName::from_static("last-event-id"); -- cgit v1.2.3