summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.sqlx/query-40759cdaeb1dddcda384b8ea28a7421b39d697ae2211cfebe8caaf12072540c1.json (renamed from .sqlx/query-9d7f46ceefa71aad1aec42205a7b6d6aecbc52c56a7a8ad38f96cb82ebcbd53e.json)14
-rw-r--r--.sqlx/query-64276398f24ab7f7d2783199a6eb6396bbd068a4b52f7b2cb47c7ec3b3e55130.json (renamed from .sqlx/query-9dbb15da4d7cc22bd2f2623016ce5938e2aa3955e1391c69f70d40d50cafec80.json)14
-rw-r--r--src/channel/repo/messages.rs5
-rw-r--r--src/events.rs29
4 files changed, 32 insertions, 30 deletions
diff --git a/.sqlx/query-9d7f46ceefa71aad1aec42205a7b6d6aecbc52c56a7a8ad38f96cb82ebcbd53e.json b/.sqlx/query-40759cdaeb1dddcda384b8ea28a7421b39d697ae2211cfebe8caaf12072540c1.json
index 93bbe5e..4118fe3 100644
--- a/.sqlx/query-9d7f46ceefa71aad1aec42205a7b6d6aecbc52c56a7a8ad38f96cb82ebcbd53e.json
+++ b/.sqlx/query-40759cdaeb1dddcda384b8ea28a7421b39d697ae2211cfebe8caaf12072540c1.json
@@ -1,6 +1,6 @@
{
"db_name": "SQLite",
- "query": "\n insert into message\n (id, sender, channel, body, sent_at)\n values ($1, $2, $3, $4, $5)\n returning\n id as \"id: Id\",\n sender as \"sender: LoginId\",\n channel as \"channel: ChannelId\",\n body,\n sent_at as \"sent_at: DateTime\"\n ",
+ "query": "\n insert into message\n (id, sender, channel, body, sent_at)\n values ($1, $2, $3, $4, $5)\n returning\n id as \"id: Id\",\n sender as \"sender: LoginId\",\n body,\n sent_at as \"sent_at: DateTime\"\n ",
"describe": {
"columns": [
{
@@ -14,18 +14,13 @@
"type_info": "Text"
},
{
- "name": "channel: ChannelId",
- "ordinal": 2,
- "type_info": "Text"
- },
- {
"name": "body",
- "ordinal": 3,
+ "ordinal": 2,
"type_info": "Text"
},
{
"name": "sent_at: DateTime",
- "ordinal": 4,
+ "ordinal": 3,
"type_info": "Text"
}
],
@@ -36,9 +31,8 @@
false,
false,
false,
- false,
false
]
},
- "hash": "9d7f46ceefa71aad1aec42205a7b6d6aecbc52c56a7a8ad38f96cb82ebcbd53e"
+ "hash": "40759cdaeb1dddcda384b8ea28a7421b39d697ae2211cfebe8caaf12072540c1"
}
diff --git a/.sqlx/query-9dbb15da4d7cc22bd2f2623016ce5938e2aa3955e1391c69f70d40d50cafec80.json b/.sqlx/query-64276398f24ab7f7d2783199a6eb6396bbd068a4b52f7b2cb47c7ec3b3e55130.json
index f2ba465..b94bb4b 100644
--- a/.sqlx/query-9dbb15da4d7cc22bd2f2623016ce5938e2aa3955e1391c69f70d40d50cafec80.json
+++ b/.sqlx/query-64276398f24ab7f7d2783199a6eb6396bbd068a4b52f7b2cb47c7ec3b3e55130.json
@@ -1,6 +1,6 @@
{
"db_name": "SQLite",
- "query": "\n select\n message.id as \"id: Id\",\n login.id as \"sender_id: LoginId\",\n login.name as sender_name,\n message.channel as \"channel: ChannelId\",\n message.body,\n message.sent_at as \"sent_at: DateTime\"\n from message\n join login on message.sender = login.id\n where channel = $1\n and coalesce(sent_at > $2, true)\n order by sent_at asc\n ",
+ "query": "\n select\n message.id as \"id: Id\",\n login.id as \"sender_id: LoginId\",\n login.name as sender_name,\n message.body,\n message.sent_at as \"sent_at: DateTime\"\n from message\n join login on message.sender = login.id\n where channel = $1\n and coalesce(sent_at > $2, true)\n order by sent_at asc\n ",
"describe": {
"columns": [
{
@@ -19,18 +19,13 @@
"type_info": "Text"
},
{
- "name": "channel: ChannelId",
- "ordinal": 3,
- "type_info": "Text"
- },
- {
"name": "body",
- "ordinal": 4,
+ "ordinal": 3,
"type_info": "Text"
},
{
"name": "sent_at: DateTime",
- "ordinal": 5,
+ "ordinal": 4,
"type_info": "Text"
}
],
@@ -42,9 +37,8 @@
false,
false,
false,
- false,
false
]
},
- "hash": "9dbb15da4d7cc22bd2f2623016ce5938e2aa3955e1391c69f70d40d50cafec80"
+ "hash": "64276398f24ab7f7d2783199a6eb6396bbd068a4b52f7b2cb47c7ec3b3e55130"
}
diff --git a/src/channel/repo/messages.rs b/src/channel/repo/messages.rs
index e15a02a..2c89724 100644
--- a/src/channel/repo/messages.rs
+++ b/src/channel/repo/messages.rs
@@ -26,7 +26,6 @@ pub struct Messages<'t>(&'t mut SqliteConnection);
pub struct BroadcastMessage {
pub id: Id,
pub sender: Login,
- pub channel: ChannelId,
pub body: String,
pub sent_at: DateTime,
}
@@ -51,7 +50,6 @@ impl<'c> Messages<'c> {
returning
id as "id: Id",
sender as "sender: LoginId",
- channel as "channel: ChannelId",
body,
sent_at as "sent_at: DateTime"
"#,
@@ -66,7 +64,6 @@ impl<'c> Messages<'c> {
BroadcastMessage {
id: row.id,
sender: sender.clone(),
- channel: row.channel,
body: row.body,
sent_at: row.sent_at,
}
@@ -88,7 +85,6 @@ impl<'c> Messages<'c> {
message.id as "id: Id",
login.id as "sender_id: LoginId",
login.name as sender_name,
- message.channel as "channel: ChannelId",
message.body,
message.sent_at as "sent_at: DateTime"
from message
@@ -106,7 +102,6 @@ impl<'c> Messages<'c> {
id: row.sender_id,
name: row.sender_name,
},
- channel: row.channel,
body: row.body,
sent_at: row.sent_at,
})
diff --git a/src/events.rs b/src/events.rs
index 2d1e1f8..51f06b4 100644
--- a/src/events.rs
+++ b/src/events.rs
@@ -49,21 +49,33 @@ async fn on_events(
let streams = stream::iter(query.channels)
.then(|channel| {
let app = app.clone();
- async move { app.channels().events(&channel, resume_at.as_ref()).await }
+ async move {
+ let events = app
+ .channels()
+ .events(&channel, resume_at.as_ref())
+ .await?
+ .map_ok(move |message| ChannelEvent {
+ channel: channel.clone(),
+ message,
+ });
+
+ Ok::<_, BoxedError>(events)
+ }
})
.try_collect::<Vec<_>>()
.await?;
- let stream = stream::select_all(streams).and_then(|msg| future::ready(to_event(msg)));
+ let stream = stream::select_all(streams).and_then(|msg| future::ready(to_sse_event(msg)));
let sse = Sse::new(stream).keep_alive(sse::KeepAlive::default());
Ok(sse)
}
-fn to_event(msg: BroadcastMessage) -> Result<sse::Event, BoxedError> {
- let data = serde_json::to_string(&msg)?;
+fn to_sse_event(event: ChannelEvent<BroadcastMessage>) -> Result<sse::Event, BoxedError> {
+ let data = serde_json::to_string(&event)?;
let event = sse::Event::default()
- .id(msg
+ .id(event
+ .message
.sent_at
.to_rfc3339_opts(SecondsFormat::AutoSi, /* use_z */ true))
.data(&data);
@@ -71,6 +83,13 @@ fn to_event(msg: BroadcastMessage) -> Result<sse::Event, BoxedError> {
Ok(event)
}
+#[derive(serde::Serialize)]
+struct ChannelEvent<M> {
+ channel: ChannelId,
+ #[serde(flatten)]
+ message: M,
+}
+
pub struct LastEventId(pub String);
static LAST_EVENT_ID: HeaderName = HeaderName::from_static("last-event-id");