summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/channel/app.rs5
-rw-r--r--src/channel/header.rs34
-rw-r--r--src/channel/mod.rs1
-rw-r--r--src/channel/repo/messages.rs19
-rw-r--r--src/channel/routes.rs54
-rw-r--r--src/cli.rs4
-rw-r--r--src/events.rs105
-rw-r--r--src/index/templates.rs3
-rw-r--r--src/lib.rs1
9 files changed, 131 insertions, 95 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs
index c0a6d60..5417a5e 100644
--- a/src/channel/app.rs
+++ b/src/channel/app.rs
@@ -4,7 +4,7 @@ use std::sync::{Arc, Mutex, MutexGuard};
use futures::{
future,
stream::{self, StreamExt as _, TryStreamExt as _},
- TryStream,
+ Stream,
};
use sqlx::sqlite::SqlitePool;
use tokio::sync::broadcast::{channel, Sender};
@@ -57,7 +57,8 @@ impl<'a> Channels<'a> {
&self,
channel: &ChannelId,
resume_at: Option<&DateTime>,
- ) -> Result<impl TryStream<Ok = BroadcastMessage, Error = BoxedError>, BoxedError> {
+ ) -> Result<impl Stream<Item = Result<BroadcastMessage, BoxedError>> + 'static, BoxedError>
+ {
fn skip_stale<E>(
resume_at: Option<&DateTime>,
) -> impl for<'m> FnMut(&'m BroadcastMessage) -> future::Ready<Result<bool, E>> {
diff --git a/src/channel/header.rs b/src/channel/header.rs
deleted file mode 100644
index eda8214..0000000
--- a/src/channel/header.rs
+++ /dev/null
@@ -1,34 +0,0 @@
-use axum::http::{HeaderName, HeaderValue};
-
-pub struct LastEventId(pub String);
-
-static LAST_EVENT_ID: HeaderName = HeaderName::from_static("last-event-id");
-
-impl headers::Header for LastEventId {
- fn name() -> &'static HeaderName {
- &LAST_EVENT_ID
- }
-
- fn decode<'i, I>(values: &mut I) -> Result<Self, headers::Error>
- where
- I: Iterator<Item = &'i HeaderValue>,
- {
- let value = values.next().ok_or_else(headers::Error::invalid)?;
- if let Ok(value) = value.to_str() {
- Ok(Self(value.into()))
- } else {
- Err(headers::Error::invalid())
- }
- }
-
- fn encode<E>(&self, values: &mut E)
- where
- E: Extend<HeaderValue>,
- {
- let Self(value) = self;
- // Must panic or suppress; the trait provides no other options.
- let value = HeaderValue::from_str(value).expect("LastEventId is a valid header value");
-
- values.extend(std::iter::once(value));
- }
-}
diff --git a/src/channel/mod.rs b/src/channel/mod.rs
index bc2cc6c..f67ea04 100644
--- a/src/channel/mod.rs
+++ b/src/channel/mod.rs
@@ -1,5 +1,4 @@
pub mod app;
-mod header;
pub mod repo;
mod routes;
diff --git a/src/channel/repo/messages.rs b/src/channel/repo/messages.rs
index b465f61..e15a02a 100644
--- a/src/channel/repo/messages.rs
+++ b/src/channel/repo/messages.rs
@@ -26,6 +26,7 @@ pub struct Messages<'t>(&'t mut SqliteConnection);
pub struct BroadcastMessage {
pub id: Id,
pub sender: Login,
+ pub channel: ChannelId,
pub body: String,
pub sent_at: DateTime,
}
@@ -49,6 +50,8 @@ impl<'c> Messages<'c> {
values ($1, $2, $3, $4, $5)
returning
id as "id: Id",
+ sender as "sender: LoginId",
+ channel as "channel: ChannelId",
body,
sent_at as "sent_at: DateTime"
"#,
@@ -58,11 +61,15 @@ impl<'c> Messages<'c> {
body,
sent_at,
)
- .map(|row| BroadcastMessage {
- sender: sender.clone(),
- id: row.id,
- body: row.body,
- sent_at: row.sent_at,
+ .map(|row| {
+ debug_assert!(row.sender == sender.id);
+ BroadcastMessage {
+ id: row.id,
+ sender: sender.clone(),
+ channel: row.channel,
+ body: row.body,
+ sent_at: row.sent_at,
+ }
})
.fetch_one(&mut *self.0)
.await?;
@@ -81,6 +88,7 @@ impl<'c> Messages<'c> {
message.id as "id: Id",
login.id as "sender_id: LoginId",
login.name as sender_name,
+ message.channel as "channel: ChannelId",
message.body,
message.sent_at as "sent_at: DateTime"
from message
@@ -98,6 +106,7 @@ impl<'c> Messages<'c> {
id: row.sender_id,
name: row.sender_name,
},
+ channel: row.channel,
body: row.body,
sent_at: row.sent_at,
})
diff --git a/src/channel/routes.rs b/src/channel/routes.rs
index eae68a2..3c2353b 100644
--- a/src/channel/routes.rs
+++ b/src/channel/routes.rs
@@ -1,30 +1,17 @@
use axum::{
extract::{Form, Path, State},
- response::{
- sse::{self, Sse},
- IntoResponse, Redirect,
- },
- routing::{get, post},
+ response::{IntoResponse, Redirect},
+ routing::post,
Router,
};
-use axum_extra::TypedHeader;
-use chrono::{format::SecondsFormat, DateTime};
-use futures::{future, stream::TryStreamExt as _};
-use super::{
- header::LastEventId,
- repo::{channels::Id as ChannelId, messages::BroadcastMessage},
-};
-use crate::{
- app::App, clock::RequestedAt, error::BoxedError, error::InternalError,
- login::repo::logins::Login,
-};
+use super::repo::channels::Id as ChannelId;
+use crate::{app::App, clock::RequestedAt, error::InternalError, login::repo::logins::Login};
pub fn router() -> Router<App> {
Router::new()
.route("/create", post(on_create))
.route("/:channel/send", post(on_send))
- .route("/:channel/events", get(on_events))
}
#[derive(serde::Deserialize)]
@@ -60,36 +47,3 @@ async fn on_send(
Ok(Redirect::to(&format!("/{}", channel)))
}
-
-async fn on_events(
- Path(channel): Path<ChannelId>,
- State(app): State<App>,
- _: Login, // requires auth, but doesn't actually care who you are
- last_event_id: Option<TypedHeader<LastEventId>>,
-) -> Result<impl IntoResponse, InternalError> {
- let resume_at = last_event_id
- .map(|TypedHeader(header)| header)
- .map(|LastEventId(header)| header)
- .map(|header| DateTime::parse_from_rfc3339(&header))
- .transpose()?
- .map(|ts| ts.to_utc());
-
- let stream = app
- .channels()
- .events(&channel, resume_at.as_ref())
- .await?
- .and_then(|msg| future::ready(to_event(msg)));
-
- Ok(Sse::new(stream).keep_alive(sse::KeepAlive::default()))
-}
-
-fn to_event(msg: BroadcastMessage) -> Result<sse::Event, BoxedError> {
- let data = serde_json::to_string(&msg)?;
- let event = sse::Event::default()
- .id(msg
- .sent_at
- .to_rfc3339_opts(SecondsFormat::AutoSi, /* use_z */ true))
- .data(&data);
-
- Ok(event)
-}
diff --git a/src/cli.rs b/src/cli.rs
index fa7c499..641f99f 100644
--- a/src/cli.rs
+++ b/src/cli.rs
@@ -6,7 +6,7 @@ use clap::Parser;
use sqlx::sqlite::{SqliteConnectOptions, SqlitePool, SqlitePoolOptions};
use tokio::net;
-use crate::{app::App, channel, clock, error::BoxedError, index, login};
+use crate::{app::App, channel, clock, error::BoxedError, events, index, login};
pub type Result<T> = std::result::Result<T, BoxedError>;
@@ -64,7 +64,7 @@ impl Args {
}
fn routers() -> Router<App> {
- [channel::router(), login::router()]
+ [channel::router(), events::router(), login::router()]
.into_iter()
.fold(index::routes::router(), Router::merge)
}
diff --git a/src/events.rs b/src/events.rs
new file mode 100644
index 0000000..2d1e1f8
--- /dev/null
+++ b/src/events.rs
@@ -0,0 +1,105 @@
+use axum::{
+ extract::State,
+ http::{HeaderName, HeaderValue},
+ response::{
+ sse::{self, Sse},
+ IntoResponse,
+ },
+ routing::get,
+ Router,
+};
+use axum_extra::{extract::Query, typed_header::TypedHeader};
+use chrono::{format::SecondsFormat, DateTime};
+use futures::{
+ future,
+ stream::{self, StreamExt as _, TryStreamExt as _},
+};
+
+use crate::{
+ app::App,
+ channel::repo::{channels::Id as ChannelId, messages::BroadcastMessage},
+ error::BoxedError,
+ error::InternalError,
+ login::repo::logins::Login,
+};
+
+pub fn router() -> Router<App> {
+ Router::new().route("/events", get(on_events))
+}
+
+#[derive(serde::Deserialize)]
+struct EventsQuery {
+ #[serde(default, rename = "channel")]
+ channels: Vec<ChannelId>,
+}
+
+async fn on_events(
+ State(app): State<App>,
+ _: Login, // requires auth, but doesn't actually care who you are
+ last_event_id: Option<TypedHeader<LastEventId>>,
+ Query(query): Query<EventsQuery>,
+) -> Result<impl IntoResponse, InternalError> {
+ let resume_at = last_event_id
+ .map(|TypedHeader(header)| header)
+ .map(|LastEventId(header)| header)
+ .map(|header| DateTime::parse_from_rfc3339(&header))
+ .transpose()?
+ .map(|ts| ts.to_utc());
+
+ let streams = stream::iter(query.channels)
+ .then(|channel| {
+ let app = app.clone();
+ async move { app.channels().events(&channel, resume_at.as_ref()).await }
+ })
+ .try_collect::<Vec<_>>()
+ .await?;
+
+ let stream = stream::select_all(streams).and_then(|msg| future::ready(to_event(msg)));
+ let sse = Sse::new(stream).keep_alive(sse::KeepAlive::default());
+
+ Ok(sse)
+}
+
+fn to_event(msg: BroadcastMessage) -> Result<sse::Event, BoxedError> {
+ let data = serde_json::to_string(&msg)?;
+ let event = sse::Event::default()
+ .id(msg
+ .sent_at
+ .to_rfc3339_opts(SecondsFormat::AutoSi, /* use_z */ true))
+ .data(&data);
+
+ Ok(event)
+}
+
+pub struct LastEventId(pub String);
+
+static LAST_EVENT_ID: HeaderName = HeaderName::from_static("last-event-id");
+
+impl headers::Header for LastEventId {
+ fn name() -> &'static HeaderName {
+ &LAST_EVENT_ID
+ }
+
+ fn decode<'i, I>(values: &mut I) -> Result<Self, headers::Error>
+ where
+ I: Iterator<Item = &'i HeaderValue>,
+ {
+ let value = values.next().ok_or_else(headers::Error::invalid)?;
+ if let Ok(value) = value.to_str() {
+ Ok(Self(value.into()))
+ } else {
+ Err(headers::Error::invalid())
+ }
+ }
+
+ fn encode<E>(&self, values: &mut E)
+ where
+ E: Extend<HeaderValue>,
+ {
+ let Self(value) = self;
+ // Must panic or suppress; the trait provides no other options.
+ let value = HeaderValue::from_str(value).expect("LastEventId is a valid header value");
+
+ values.extend(std::iter::once(value));
+ }
+}
diff --git a/src/index/templates.rs b/src/index/templates.rs
index 7472fd0..a69c19a 100644
--- a/src/index/templates.rs
+++ b/src/index/templates.rs
@@ -105,7 +105,8 @@ pub fn channel(channel: &Channel) -> Markup {
span.sent_at { "(sent_at)" }
")" }
}
- link rel="events" href=(format!("/{}/events", channel.id)) {}
+ meta name="channel" content=(channel.id) {}
+ link rel="events" href=(format!("/events?channel={}", channel.id)) {}
}
body {
section class="messages" {}
diff --git a/src/lib.rs b/src/lib.rs
index d8c9cc0..3813c28 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -3,6 +3,7 @@ mod channel;
pub mod cli;
mod clock;
mod error;
+mod events;
mod id;
mod index;
mod login;