summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.sqlx/query-9d7f46ceefa71aad1aec42205a7b6d6aecbc52c56a7a8ad38f96cb82ebcbd53e.json (renamed from .sqlx/query-5df9d6889d5e057c3260d4956cdb313786b458082db232919de1a5e5195df7ee.json)20
-rw-r--r--.sqlx/query-9dbb15da4d7cc22bd2f2623016ce5938e2aa3955e1391c69f70d40d50cafec80.json (renamed from .sqlx/query-64276398f24ab7f7d2783199a6eb6396bbd068a4b52f7b2cb47c7ec3b3e55130.json)14
-rw-r--r--Cargo.lock14
-rw-r--r--Cargo.toml2
-rw-r--r--js/channel.js23
-rw-r--r--src/channel/app.rs5
-rw-r--r--src/channel/header.rs34
-rw-r--r--src/channel/mod.rs1
-rw-r--r--src/channel/repo/messages.rs19
-rw-r--r--src/channel/routes.rs54
-rw-r--r--src/cli.rs4
-rw-r--r--src/events.rs105
-rw-r--r--src/index/templates.rs3
-rw-r--r--src/lib.rs1
14 files changed, 185 insertions, 114 deletions
diff --git a/.sqlx/query-5df9d6889d5e057c3260d4956cdb313786b458082db232919de1a5e5195df7ee.json b/.sqlx/query-9d7f46ceefa71aad1aec42205a7b6d6aecbc52c56a7a8ad38f96cb82ebcbd53e.json
index 7fd7ae3..93bbe5e 100644
--- a/.sqlx/query-5df9d6889d5e057c3260d4956cdb313786b458082db232919de1a5e5195df7ee.json
+++ b/.sqlx/query-9d7f46ceefa71aad1aec42205a7b6d6aecbc52c56a7a8ad38f96cb82ebcbd53e.json
@@ -1,6 +1,6 @@
{
"db_name": "SQLite",
- "query": "\n insert into message\n (id, sender, channel, body, sent_at)\n values ($1, $2, $3, $4, $5)\n returning\n id as \"id: Id\",\n body,\n sent_at as \"sent_at: DateTime\"\n ",
+ "query": "\n insert into message\n (id, sender, channel, body, sent_at)\n values ($1, $2, $3, $4, $5)\n returning\n id as \"id: Id\",\n sender as \"sender: LoginId\",\n channel as \"channel: ChannelId\",\n body,\n sent_at as \"sent_at: DateTime\"\n ",
"describe": {
"columns": [
{
@@ -9,14 +9,24 @@
"type_info": "Text"
},
{
- "name": "body",
+ "name": "sender: LoginId",
"ordinal": 1,
"type_info": "Text"
},
{
- "name": "sent_at: DateTime",
+ "name": "channel: ChannelId",
"ordinal": 2,
"type_info": "Text"
+ },
+ {
+ "name": "body",
+ "ordinal": 3,
+ "type_info": "Text"
+ },
+ {
+ "name": "sent_at: DateTime",
+ "ordinal": 4,
+ "type_info": "Text"
}
],
"parameters": {
@@ -25,8 +35,10 @@
"nullable": [
false,
false,
+ false,
+ false,
false
]
},
- "hash": "5df9d6889d5e057c3260d4956cdb313786b458082db232919de1a5e5195df7ee"
+ "hash": "9d7f46ceefa71aad1aec42205a7b6d6aecbc52c56a7a8ad38f96cb82ebcbd53e"
}
diff --git a/.sqlx/query-64276398f24ab7f7d2783199a6eb6396bbd068a4b52f7b2cb47c7ec3b3e55130.json b/.sqlx/query-9dbb15da4d7cc22bd2f2623016ce5938e2aa3955e1391c69f70d40d50cafec80.json
index b94bb4b..f2ba465 100644
--- a/.sqlx/query-64276398f24ab7f7d2783199a6eb6396bbd068a4b52f7b2cb47c7ec3b3e55130.json
+++ b/.sqlx/query-9dbb15da4d7cc22bd2f2623016ce5938e2aa3955e1391c69f70d40d50cafec80.json
@@ -1,6 +1,6 @@
{
"db_name": "SQLite",
- "query": "\n select\n message.id as \"id: Id\",\n login.id as \"sender_id: LoginId\",\n login.name as sender_name,\n message.body,\n message.sent_at as \"sent_at: DateTime\"\n from message\n join login on message.sender = login.id\n where channel = $1\n and coalesce(sent_at > $2, true)\n order by sent_at asc\n ",
+ "query": "\n select\n message.id as \"id: Id\",\n login.id as \"sender_id: LoginId\",\n login.name as sender_name,\n message.channel as \"channel: ChannelId\",\n message.body,\n message.sent_at as \"sent_at: DateTime\"\n from message\n join login on message.sender = login.id\n where channel = $1\n and coalesce(sent_at > $2, true)\n order by sent_at asc\n ",
"describe": {
"columns": [
{
@@ -19,14 +19,19 @@
"type_info": "Text"
},
{
- "name": "body",
+ "name": "channel: ChannelId",
"ordinal": 3,
"type_info": "Text"
},
{
- "name": "sent_at: DateTime",
+ "name": "body",
"ordinal": 4,
"type_info": "Text"
+ },
+ {
+ "name": "sent_at: DateTime",
+ "ordinal": 5,
+ "type_info": "Text"
}
],
"parameters": {
@@ -37,8 +42,9 @@
false,
false,
false,
+ false,
false
]
},
- "hash": "64276398f24ab7f7d2783199a6eb6396bbd068a4b52f7b2cb47c7ec3b3e55130"
+ "hash": "9dbb15da4d7cc22bd2f2623016ce5938e2aa3955e1391c69f70d40d50cafec80"
}
diff --git a/Cargo.lock b/Cargo.lock
index 9f2c8f5..448bef3 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -210,6 +210,7 @@ dependencies = [
"mime",
"pin-project-lite",
"serde",
+ "serde_html_form",
"tower",
"tower-layer",
"tower-service",
@@ -1525,6 +1526,19 @@ dependencies = [
]
[[package]]
+name = "serde_html_form"
+version = "0.2.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8de514ef58196f1fc96dcaef80fe6170a1ce6215df9687a93fe8300e773fefc5"
+dependencies = [
+ "form_urlencoded",
+ "indexmap",
+ "itoa",
+ "ryu",
+ "serde",
+]
+
+[[package]]
name = "serde_json"
version = "1.0.128"
source = "registry+https://github.com/rust-lang/crates.io-index"
diff --git a/Cargo.toml b/Cargo.toml
index 05a880b..505378d 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -7,7 +7,7 @@ edition = "2021"
argon2 = "0.5.3"
async-trait = "0.1.82"
axum = { version = "0.7.5", features = ["form"] }
-axum-extra = { version = "0.9.3", features = ["cookie", "typed-header"] }
+axum-extra = { version = "0.9.3", features = ["cookie", "query", "typed-header"] }
chrono = { version = "0.4.38", features = ["serde"] }
clap = { version = "4.5.16", features = ["derive", "env"] }
futures = "0.3.30"
diff --git a/js/channel.js b/js/channel.js
index 96d1061..f994ada 100644
--- a/js/channel.js
+++ b/js/channel.js
@@ -9,6 +9,7 @@ function ready(callback) {
}
ready(() => {
+ let channel = document.querySelector('meta[name=channel]').content;
let template = document.querySelector('#message').content;
document.querySelectorAll('link[rel=events]').forEach(elem => {
@@ -17,18 +18,20 @@ ready(() => {
source.addEventListener('message', message => {
let body = JSON.parse(message.data);
- document.querySelectorAll('.messages').forEach(elem => {
- let message = template.cloneNode(true);
+ if (body.channel === channel) {
+ document.querySelectorAll('.messages').forEach(elem => {
+ let message = template.cloneNode(true);
- message.querySelectorAll('.sender')
- .forEach(elem => elem.textContent = body.sender.name);
- message.querySelectorAll('.message')
- .forEach(elem => elem.textContent = body.body);
- message.querySelectorAll('.sent_at')
- .forEach(elem => elem.textContent = body.sent_at);
+ message.querySelectorAll('.sender')
+ .forEach(elem => elem.textContent = body.sender.name);
+ message.querySelectorAll('.message')
+ .forEach(elem => elem.textContent = body.body);
+ message.querySelectorAll('.sent_at')
+ .forEach(elem => elem.textContent = body.sent_at);
- message.childNodes.forEach(node => elem.appendChild(node));
- });
+ message.childNodes.forEach(node => elem.appendChild(node));
+ });
+ }
});
});
})
diff --git a/src/channel/app.rs b/src/channel/app.rs
index c0a6d60..5417a5e 100644
--- a/src/channel/app.rs
+++ b/src/channel/app.rs
@@ -4,7 +4,7 @@ use std::sync::{Arc, Mutex, MutexGuard};
use futures::{
future,
stream::{self, StreamExt as _, TryStreamExt as _},
- TryStream,
+ Stream,
};
use sqlx::sqlite::SqlitePool;
use tokio::sync::broadcast::{channel, Sender};
@@ -57,7 +57,8 @@ impl<'a> Channels<'a> {
&self,
channel: &ChannelId,
resume_at: Option<&DateTime>,
- ) -> Result<impl TryStream<Ok = BroadcastMessage, Error = BoxedError>, BoxedError> {
+ ) -> Result<impl Stream<Item = Result<BroadcastMessage, BoxedError>> + 'static, BoxedError>
+ {
fn skip_stale<E>(
resume_at: Option<&DateTime>,
) -> impl for<'m> FnMut(&'m BroadcastMessage) -> future::Ready<Result<bool, E>> {
diff --git a/src/channel/header.rs b/src/channel/header.rs
deleted file mode 100644
index eda8214..0000000
--- a/src/channel/header.rs
+++ /dev/null
@@ -1,34 +0,0 @@
-use axum::http::{HeaderName, HeaderValue};
-
-pub struct LastEventId(pub String);
-
-static LAST_EVENT_ID: HeaderName = HeaderName::from_static("last-event-id");
-
-impl headers::Header for LastEventId {
- fn name() -> &'static HeaderName {
- &LAST_EVENT_ID
- }
-
- fn decode<'i, I>(values: &mut I) -> Result<Self, headers::Error>
- where
- I: Iterator<Item = &'i HeaderValue>,
- {
- let value = values.next().ok_or_else(headers::Error::invalid)?;
- if let Ok(value) = value.to_str() {
- Ok(Self(value.into()))
- } else {
- Err(headers::Error::invalid())
- }
- }
-
- fn encode<E>(&self, values: &mut E)
- where
- E: Extend<HeaderValue>,
- {
- let Self(value) = self;
- // Must panic or suppress; the trait provides no other options.
- let value = HeaderValue::from_str(value).expect("LastEventId is a valid header value");
-
- values.extend(std::iter::once(value));
- }
-}
diff --git a/src/channel/mod.rs b/src/channel/mod.rs
index bc2cc6c..f67ea04 100644
--- a/src/channel/mod.rs
+++ b/src/channel/mod.rs
@@ -1,5 +1,4 @@
pub mod app;
-mod header;
pub mod repo;
mod routes;
diff --git a/src/channel/repo/messages.rs b/src/channel/repo/messages.rs
index b465f61..e15a02a 100644
--- a/src/channel/repo/messages.rs
+++ b/src/channel/repo/messages.rs
@@ -26,6 +26,7 @@ pub struct Messages<'t>(&'t mut SqliteConnection);
pub struct BroadcastMessage {
pub id: Id,
pub sender: Login,
+ pub channel: ChannelId,
pub body: String,
pub sent_at: DateTime,
}
@@ -49,6 +50,8 @@ impl<'c> Messages<'c> {
values ($1, $2, $3, $4, $5)
returning
id as "id: Id",
+ sender as "sender: LoginId",
+ channel as "channel: ChannelId",
body,
sent_at as "sent_at: DateTime"
"#,
@@ -58,11 +61,15 @@ impl<'c> Messages<'c> {
body,
sent_at,
)
- .map(|row| BroadcastMessage {
- sender: sender.clone(),
- id: row.id,
- body: row.body,
- sent_at: row.sent_at,
+ .map(|row| {
+ debug_assert!(row.sender == sender.id);
+ BroadcastMessage {
+ id: row.id,
+ sender: sender.clone(),
+ channel: row.channel,
+ body: row.body,
+ sent_at: row.sent_at,
+ }
})
.fetch_one(&mut *self.0)
.await?;
@@ -81,6 +88,7 @@ impl<'c> Messages<'c> {
message.id as "id: Id",
login.id as "sender_id: LoginId",
login.name as sender_name,
+ message.channel as "channel: ChannelId",
message.body,
message.sent_at as "sent_at: DateTime"
from message
@@ -98,6 +106,7 @@ impl<'c> Messages<'c> {
id: row.sender_id,
name: row.sender_name,
},
+ channel: row.channel,
body: row.body,
sent_at: row.sent_at,
})
diff --git a/src/channel/routes.rs b/src/channel/routes.rs
index eae68a2..3c2353b 100644
--- a/src/channel/routes.rs
+++ b/src/channel/routes.rs
@@ -1,30 +1,17 @@
use axum::{
extract::{Form, Path, State},
- response::{
- sse::{self, Sse},
- IntoResponse, Redirect,
- },
- routing::{get, post},
+ response::{IntoResponse, Redirect},
+ routing::post,
Router,
};
-use axum_extra::TypedHeader;
-use chrono::{format::SecondsFormat, DateTime};
-use futures::{future, stream::TryStreamExt as _};
-use super::{
- header::LastEventId,
- repo::{channels::Id as ChannelId, messages::BroadcastMessage},
-};
-use crate::{
- app::App, clock::RequestedAt, error::BoxedError, error::InternalError,
- login::repo::logins::Login,
-};
+use super::repo::channels::Id as ChannelId;
+use crate::{app::App, clock::RequestedAt, error::InternalError, login::repo::logins::Login};
pub fn router() -> Router<App> {
Router::new()
.route("/create", post(on_create))
.route("/:channel/send", post(on_send))
- .route("/:channel/events", get(on_events))
}
#[derive(serde::Deserialize)]
@@ -60,36 +47,3 @@ async fn on_send(
Ok(Redirect::to(&format!("/{}", channel)))
}
-
-async fn on_events(
- Path(channel): Path<ChannelId>,
- State(app): State<App>,
- _: Login, // requires auth, but doesn't actually care who you are
- last_event_id: Option<TypedHeader<LastEventId>>,
-) -> Result<impl IntoResponse, InternalError> {
- let resume_at = last_event_id
- .map(|TypedHeader(header)| header)
- .map(|LastEventId(header)| header)
- .map(|header| DateTime::parse_from_rfc3339(&header))
- .transpose()?
- .map(|ts| ts.to_utc());
-
- let stream = app
- .channels()
- .events(&channel, resume_at.as_ref())
- .await?
- .and_then(|msg| future::ready(to_event(msg)));
-
- Ok(Sse::new(stream).keep_alive(sse::KeepAlive::default()))
-}
-
-fn to_event(msg: BroadcastMessage) -> Result<sse::Event, BoxedError> {
- let data = serde_json::to_string(&msg)?;
- let event = sse::Event::default()
- .id(msg
- .sent_at
- .to_rfc3339_opts(SecondsFormat::AutoSi, /* use_z */ true))
- .data(&data);
-
- Ok(event)
-}
diff --git a/src/cli.rs b/src/cli.rs
index fa7c499..641f99f 100644
--- a/src/cli.rs
+++ b/src/cli.rs
@@ -6,7 +6,7 @@ use clap::Parser;
use sqlx::sqlite::{SqliteConnectOptions, SqlitePool, SqlitePoolOptions};
use tokio::net;
-use crate::{app::App, channel, clock, error::BoxedError, index, login};
+use crate::{app::App, channel, clock, error::BoxedError, events, index, login};
pub type Result<T> = std::result::Result<T, BoxedError>;
@@ -64,7 +64,7 @@ impl Args {
}
fn routers() -> Router<App> {
- [channel::router(), login::router()]
+ [channel::router(), events::router(), login::router()]
.into_iter()
.fold(index::routes::router(), Router::merge)
}
diff --git a/src/events.rs b/src/events.rs
new file mode 100644
index 0000000..2d1e1f8
--- /dev/null
+++ b/src/events.rs
@@ -0,0 +1,105 @@
+use axum::{
+ extract::State,
+ http::{HeaderName, HeaderValue},
+ response::{
+ sse::{self, Sse},
+ IntoResponse,
+ },
+ routing::get,
+ Router,
+};
+use axum_extra::{extract::Query, typed_header::TypedHeader};
+use chrono::{format::SecondsFormat, DateTime};
+use futures::{
+ future,
+ stream::{self, StreamExt as _, TryStreamExt as _},
+};
+
+use crate::{
+ app::App,
+ channel::repo::{channels::Id as ChannelId, messages::BroadcastMessage},
+ error::BoxedError,
+ error::InternalError,
+ login::repo::logins::Login,
+};
+
+pub fn router() -> Router<App> {
+ Router::new().route("/events", get(on_events))
+}
+
+#[derive(serde::Deserialize)]
+struct EventsQuery {
+ #[serde(default, rename = "channel")]
+ channels: Vec<ChannelId>,
+}
+
+async fn on_events(
+ State(app): State<App>,
+ _: Login, // requires auth, but doesn't actually care who you are
+ last_event_id: Option<TypedHeader<LastEventId>>,
+ Query(query): Query<EventsQuery>,
+) -> Result<impl IntoResponse, InternalError> {
+ let resume_at = last_event_id
+ .map(|TypedHeader(header)| header)
+ .map(|LastEventId(header)| header)
+ .map(|header| DateTime::parse_from_rfc3339(&header))
+ .transpose()?
+ .map(|ts| ts.to_utc());
+
+ let streams = stream::iter(query.channels)
+ .then(|channel| {
+ let app = app.clone();
+ async move { app.channels().events(&channel, resume_at.as_ref()).await }
+ })
+ .try_collect::<Vec<_>>()
+ .await?;
+
+ let stream = stream::select_all(streams).and_then(|msg| future::ready(to_event(msg)));
+ let sse = Sse::new(stream).keep_alive(sse::KeepAlive::default());
+
+ Ok(sse)
+}
+
+fn to_event(msg: BroadcastMessage) -> Result<sse::Event, BoxedError> {
+ let data = serde_json::to_string(&msg)?;
+ let event = sse::Event::default()
+ .id(msg
+ .sent_at
+ .to_rfc3339_opts(SecondsFormat::AutoSi, /* use_z */ true))
+ .data(&data);
+
+ Ok(event)
+}
+
+pub struct LastEventId(pub String);
+
+static LAST_EVENT_ID: HeaderName = HeaderName::from_static("last-event-id");
+
+impl headers::Header for LastEventId {
+ fn name() -> &'static HeaderName {
+ &LAST_EVENT_ID
+ }
+
+ fn decode<'i, I>(values: &mut I) -> Result<Self, headers::Error>
+ where
+ I: Iterator<Item = &'i HeaderValue>,
+ {
+ let value = values.next().ok_or_else(headers::Error::invalid)?;
+ if let Ok(value) = value.to_str() {
+ Ok(Self(value.into()))
+ } else {
+ Err(headers::Error::invalid())
+ }
+ }
+
+ fn encode<E>(&self, values: &mut E)
+ where
+ E: Extend<HeaderValue>,
+ {
+ let Self(value) = self;
+ // Must panic or suppress; the trait provides no other options.
+ let value = HeaderValue::from_str(value).expect("LastEventId is a valid header value");
+
+ values.extend(std::iter::once(value));
+ }
+}
diff --git a/src/index/templates.rs b/src/index/templates.rs
index 7472fd0..a69c19a 100644
--- a/src/index/templates.rs
+++ b/src/index/templates.rs
@@ -105,7 +105,8 @@ pub fn channel(channel: &Channel) -> Markup {
span.sent_at { "(sent_at)" }
")" }
}
- link rel="events" href=(format!("/{}/events", channel.id)) {}
+ meta name="channel" content=(channel.id) {}
+ link rel="events" href=(format!("/events?channel={}", channel.id)) {}
}
body {
section class="messages" {}
diff --git a/src/lib.rs b/src/lib.rs
index d8c9cc0..3813c28 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -3,6 +3,7 @@ mod channel;
pub mod cli;
mod clock;
mod error;
+mod events;
mod id;
mod index;
mod login;