diff options
| -rw-r--r-- | .sqlx/query-40759cdaeb1dddcda384b8ea28a7421b39d697ae2211cfebe8caaf12072540c1.json (renamed from .sqlx/query-9d7f46ceefa71aad1aec42205a7b6d6aecbc52c56a7a8ad38f96cb82ebcbd53e.json) | 14 | ||||
| -rw-r--r-- | .sqlx/query-64276398f24ab7f7d2783199a6eb6396bbd068a4b52f7b2cb47c7ec3b3e55130.json (renamed from .sqlx/query-9dbb15da4d7cc22bd2f2623016ce5938e2aa3955e1391c69f70d40d50cafec80.json) | 14 | ||||
| -rw-r--r-- | src/channel/repo/messages.rs | 5 | ||||
| -rw-r--r-- | src/events.rs | 29 |
4 files changed, 32 insertions, 30 deletions
diff --git a/.sqlx/query-9d7f46ceefa71aad1aec42205a7b6d6aecbc52c56a7a8ad38f96cb82ebcbd53e.json b/.sqlx/query-40759cdaeb1dddcda384b8ea28a7421b39d697ae2211cfebe8caaf12072540c1.json index 93bbe5e..4118fe3 100644 --- a/.sqlx/query-9d7f46ceefa71aad1aec42205a7b6d6aecbc52c56a7a8ad38f96cb82ebcbd53e.json +++ b/.sqlx/query-40759cdaeb1dddcda384b8ea28a7421b39d697ae2211cfebe8caaf12072540c1.json @@ -1,6 +1,6 @@ { "db_name": "SQLite", - "query": "\n insert into message\n (id, sender, channel, body, sent_at)\n values ($1, $2, $3, $4, $5)\n returning\n id as \"id: Id\",\n sender as \"sender: LoginId\",\n channel as \"channel: ChannelId\",\n body,\n sent_at as \"sent_at: DateTime\"\n ", + "query": "\n insert into message\n (id, sender, channel, body, sent_at)\n values ($1, $2, $3, $4, $5)\n returning\n id as \"id: Id\",\n sender as \"sender: LoginId\",\n body,\n sent_at as \"sent_at: DateTime\"\n ", "describe": { "columns": [ { @@ -14,18 +14,13 @@ "type_info": "Text" }, { - "name": "channel: ChannelId", - "ordinal": 2, - "type_info": "Text" - }, - { "name": "body", - "ordinal": 3, + "ordinal": 2, "type_info": "Text" }, { "name": "sent_at: DateTime", - "ordinal": 4, + "ordinal": 3, "type_info": "Text" } ], @@ -36,9 +31,8 @@ false, false, false, - false, false ] }, - "hash": "9d7f46ceefa71aad1aec42205a7b6d6aecbc52c56a7a8ad38f96cb82ebcbd53e" + "hash": "40759cdaeb1dddcda384b8ea28a7421b39d697ae2211cfebe8caaf12072540c1" } diff --git a/.sqlx/query-9dbb15da4d7cc22bd2f2623016ce5938e2aa3955e1391c69f70d40d50cafec80.json b/.sqlx/query-64276398f24ab7f7d2783199a6eb6396bbd068a4b52f7b2cb47c7ec3b3e55130.json index f2ba465..b94bb4b 100644 --- a/.sqlx/query-9dbb15da4d7cc22bd2f2623016ce5938e2aa3955e1391c69f70d40d50cafec80.json +++ b/.sqlx/query-64276398f24ab7f7d2783199a6eb6396bbd068a4b52f7b2cb47c7ec3b3e55130.json @@ -1,6 +1,6 @@ { "db_name": "SQLite", - "query": "\n select\n message.id as \"id: Id\",\n login.id as \"sender_id: LoginId\",\n login.name as sender_name,\n message.channel as \"channel: ChannelId\",\n message.body,\n message.sent_at as \"sent_at: DateTime\"\n from message\n join login on message.sender = login.id\n where channel = $1\n and coalesce(sent_at > $2, true)\n order by sent_at asc\n ", + "query": "\n select\n message.id as \"id: Id\",\n login.id as \"sender_id: LoginId\",\n login.name as sender_name,\n message.body,\n message.sent_at as \"sent_at: DateTime\"\n from message\n join login on message.sender = login.id\n where channel = $1\n and coalesce(sent_at > $2, true)\n order by sent_at asc\n ", "describe": { "columns": [ { @@ -19,18 +19,13 @@ "type_info": "Text" }, { - "name": "channel: ChannelId", - "ordinal": 3, - "type_info": "Text" - }, - { "name": "body", - "ordinal": 4, + "ordinal": 3, "type_info": "Text" }, { "name": "sent_at: DateTime", - "ordinal": 5, + "ordinal": 4, "type_info": "Text" } ], @@ -42,9 +37,8 @@ false, false, false, - false, false ] }, - "hash": "9dbb15da4d7cc22bd2f2623016ce5938e2aa3955e1391c69f70d40d50cafec80" + "hash": "64276398f24ab7f7d2783199a6eb6396bbd068a4b52f7b2cb47c7ec3b3e55130" } diff --git a/src/channel/repo/messages.rs b/src/channel/repo/messages.rs index e15a02a..2c89724 100644 --- a/src/channel/repo/messages.rs +++ b/src/channel/repo/messages.rs @@ -26,7 +26,6 @@ pub struct Messages<'t>(&'t mut SqliteConnection); pub struct BroadcastMessage { pub id: Id, pub sender: Login, - pub channel: ChannelId, pub body: String, pub sent_at: DateTime, } @@ -51,7 +50,6 @@ impl<'c> Messages<'c> { returning id as "id: Id", sender as "sender: LoginId", - channel as "channel: ChannelId", body, sent_at as "sent_at: DateTime" "#, @@ -66,7 +64,6 @@ impl<'c> Messages<'c> { BroadcastMessage { id: row.id, sender: sender.clone(), - channel: row.channel, body: row.body, sent_at: row.sent_at, } @@ -88,7 +85,6 @@ impl<'c> Messages<'c> { message.id as "id: Id", login.id as "sender_id: LoginId", login.name as sender_name, - message.channel as "channel: ChannelId", message.body, message.sent_at as "sent_at: DateTime" from message @@ -106,7 +102,6 @@ impl<'c> Messages<'c> { id: row.sender_id, name: row.sender_name, }, - channel: row.channel, body: row.body, sent_at: row.sent_at, }) diff --git a/src/events.rs b/src/events.rs index 2d1e1f8..51f06b4 100644 --- a/src/events.rs +++ b/src/events.rs @@ -49,21 +49,33 @@ async fn on_events( let streams = stream::iter(query.channels) .then(|channel| { let app = app.clone(); - async move { app.channels().events(&channel, resume_at.as_ref()).await } + async move { + let events = app + .channels() + .events(&channel, resume_at.as_ref()) + .await? + .map_ok(move |message| ChannelEvent { + channel: channel.clone(), + message, + }); + + Ok::<_, BoxedError>(events) + } }) .try_collect::<Vec<_>>() .await?; - let stream = stream::select_all(streams).and_then(|msg| future::ready(to_event(msg))); + let stream = stream::select_all(streams).and_then(|msg| future::ready(to_sse_event(msg))); let sse = Sse::new(stream).keep_alive(sse::KeepAlive::default()); Ok(sse) } -fn to_event(msg: BroadcastMessage) -> Result<sse::Event, BoxedError> { - let data = serde_json::to_string(&msg)?; +fn to_sse_event(event: ChannelEvent<BroadcastMessage>) -> Result<sse::Event, BoxedError> { + let data = serde_json::to_string(&event)?; let event = sse::Event::default() - .id(msg + .id(event + .message .sent_at .to_rfc3339_opts(SecondsFormat::AutoSi, /* use_z */ true)) .data(&data); @@ -71,6 +83,13 @@ fn to_event(msg: BroadcastMessage) -> Result<sse::Event, BoxedError> { Ok(event) } +#[derive(serde::Serialize)] +struct ChannelEvent<M> { + channel: ChannelId, + #[serde(flatten)] + message: M, +} + pub struct LastEventId(pub String); static LAST_EVENT_ID: HeaderName = HeaderName::from_static("last-event-id"); |
