diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/channel/app.rs | 5 | ||||
| -rw-r--r-- | src/channel/header.rs | 34 | ||||
| -rw-r--r-- | src/channel/mod.rs | 1 | ||||
| -rw-r--r-- | src/channel/repo/messages.rs | 19 | ||||
| -rw-r--r-- | src/channel/routes.rs | 54 | ||||
| -rw-r--r-- | src/cli.rs | 4 | ||||
| -rw-r--r-- | src/events.rs | 105 | ||||
| -rw-r--r-- | src/index/templates.rs | 3 | ||||
| -rw-r--r-- | src/lib.rs | 1 |
9 files changed, 131 insertions, 95 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs index c0a6d60..5417a5e 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -4,7 +4,7 @@ use std::sync::{Arc, Mutex, MutexGuard}; use futures::{ future, stream::{self, StreamExt as _, TryStreamExt as _}, - TryStream, + Stream, }; use sqlx::sqlite::SqlitePool; use tokio::sync::broadcast::{channel, Sender}; @@ -57,7 +57,8 @@ impl<'a> Channels<'a> { &self, channel: &ChannelId, resume_at: Option<&DateTime>, - ) -> Result<impl TryStream<Ok = BroadcastMessage, Error = BoxedError>, BoxedError> { + ) -> Result<impl Stream<Item = Result<BroadcastMessage, BoxedError>> + 'static, BoxedError> + { fn skip_stale<E>( resume_at: Option<&DateTime>, ) -> impl for<'m> FnMut(&'m BroadcastMessage) -> future::Ready<Result<bool, E>> { diff --git a/src/channel/header.rs b/src/channel/header.rs deleted file mode 100644 index eda8214..0000000 --- a/src/channel/header.rs +++ /dev/null @@ -1,34 +0,0 @@ -use axum::http::{HeaderName, HeaderValue}; - -pub struct LastEventId(pub String); - -static LAST_EVENT_ID: HeaderName = HeaderName::from_static("last-event-id"); - -impl headers::Header for LastEventId { - fn name() -> &'static HeaderName { - &LAST_EVENT_ID - } - - fn decode<'i, I>(values: &mut I) -> Result<Self, headers::Error> - where - I: Iterator<Item = &'i HeaderValue>, - { - let value = values.next().ok_or_else(headers::Error::invalid)?; - if let Ok(value) = value.to_str() { - Ok(Self(value.into())) - } else { - Err(headers::Error::invalid()) - } - } - - fn encode<E>(&self, values: &mut E) - where - E: Extend<HeaderValue>, - { - let Self(value) = self; - // Must panic or suppress; the trait provides no other options. - let value = HeaderValue::from_str(value).expect("LastEventId is a valid header value"); - - values.extend(std::iter::once(value)); - } -} diff --git a/src/channel/mod.rs b/src/channel/mod.rs index bc2cc6c..f67ea04 100644 --- a/src/channel/mod.rs +++ b/src/channel/mod.rs @@ -1,5 +1,4 @@ pub mod app; -mod header; pub mod repo; mod routes; diff --git a/src/channel/repo/messages.rs b/src/channel/repo/messages.rs index b465f61..e15a02a 100644 --- a/src/channel/repo/messages.rs +++ b/src/channel/repo/messages.rs @@ -26,6 +26,7 @@ pub struct Messages<'t>(&'t mut SqliteConnection); pub struct BroadcastMessage { pub id: Id, pub sender: Login, + pub channel: ChannelId, pub body: String, pub sent_at: DateTime, } @@ -49,6 +50,8 @@ impl<'c> Messages<'c> { values ($1, $2, $3, $4, $5) returning id as "id: Id", + sender as "sender: LoginId", + channel as "channel: ChannelId", body, sent_at as "sent_at: DateTime" "#, @@ -58,11 +61,15 @@ impl<'c> Messages<'c> { body, sent_at, ) - .map(|row| BroadcastMessage { - sender: sender.clone(), - id: row.id, - body: row.body, - sent_at: row.sent_at, + .map(|row| { + debug_assert!(row.sender == sender.id); + BroadcastMessage { + id: row.id, + sender: sender.clone(), + channel: row.channel, + body: row.body, + sent_at: row.sent_at, + } }) .fetch_one(&mut *self.0) .await?; @@ -81,6 +88,7 @@ impl<'c> Messages<'c> { message.id as "id: Id", login.id as "sender_id: LoginId", login.name as sender_name, + message.channel as "channel: ChannelId", message.body, message.sent_at as "sent_at: DateTime" from message @@ -98,6 +106,7 @@ impl<'c> Messages<'c> { id: row.sender_id, name: row.sender_name, }, + channel: row.channel, body: row.body, sent_at: row.sent_at, }) diff --git a/src/channel/routes.rs b/src/channel/routes.rs index eae68a2..3c2353b 100644 --- a/src/channel/routes.rs +++ b/src/channel/routes.rs @@ -1,30 +1,17 @@ use axum::{ extract::{Form, Path, State}, - response::{ - sse::{self, Sse}, - IntoResponse, Redirect, - }, - routing::{get, post}, + response::{IntoResponse, Redirect}, + routing::post, Router, }; -use axum_extra::TypedHeader; -use chrono::{format::SecondsFormat, DateTime}; -use futures::{future, stream::TryStreamExt as _}; -use super::{ - header::LastEventId, - repo::{channels::Id as ChannelId, messages::BroadcastMessage}, -}; -use crate::{ - app::App, clock::RequestedAt, error::BoxedError, error::InternalError, - login::repo::logins::Login, -}; +use super::repo::channels::Id as ChannelId; +use crate::{app::App, clock::RequestedAt, error::InternalError, login::repo::logins::Login}; pub fn router() -> Router<App> { Router::new() .route("/create", post(on_create)) .route("/:channel/send", post(on_send)) - .route("/:channel/events", get(on_events)) } #[derive(serde::Deserialize)] @@ -60,36 +47,3 @@ async fn on_send( Ok(Redirect::to(&format!("/{}", channel))) } - -async fn on_events( - Path(channel): Path<ChannelId>, - State(app): State<App>, - _: Login, // requires auth, but doesn't actually care who you are - last_event_id: Option<TypedHeader<LastEventId>>, -) -> Result<impl IntoResponse, InternalError> { - let resume_at = last_event_id - .map(|TypedHeader(header)| header) - .map(|LastEventId(header)| header) - .map(|header| DateTime::parse_from_rfc3339(&header)) - .transpose()? - .map(|ts| ts.to_utc()); - - let stream = app - .channels() - .events(&channel, resume_at.as_ref()) - .await? - .and_then(|msg| future::ready(to_event(msg))); - - Ok(Sse::new(stream).keep_alive(sse::KeepAlive::default())) -} - -fn to_event(msg: BroadcastMessage) -> Result<sse::Event, BoxedError> { - let data = serde_json::to_string(&msg)?; - let event = sse::Event::default() - .id(msg - .sent_at - .to_rfc3339_opts(SecondsFormat::AutoSi, /* use_z */ true)) - .data(&data); - - Ok(event) -} @@ -6,7 +6,7 @@ use clap::Parser; use sqlx::sqlite::{SqliteConnectOptions, SqlitePool, SqlitePoolOptions}; use tokio::net; -use crate::{app::App, channel, clock, error::BoxedError, index, login}; +use crate::{app::App, channel, clock, error::BoxedError, events, index, login}; pub type Result<T> = std::result::Result<T, BoxedError>; @@ -64,7 +64,7 @@ impl Args { } fn routers() -> Router<App> { - [channel::router(), login::router()] + [channel::router(), events::router(), login::router()] .into_iter() .fold(index::routes::router(), Router::merge) } diff --git a/src/events.rs b/src/events.rs new file mode 100644 index 0000000..2d1e1f8 --- /dev/null +++ b/src/events.rs @@ -0,0 +1,105 @@ +use axum::{ + extract::State, + http::{HeaderName, HeaderValue}, + response::{ + sse::{self, Sse}, + IntoResponse, + }, + routing::get, + Router, +}; +use axum_extra::{extract::Query, typed_header::TypedHeader}; +use chrono::{format::SecondsFormat, DateTime}; +use futures::{ + future, + stream::{self, StreamExt as _, TryStreamExt as _}, +}; + +use crate::{ + app::App, + channel::repo::{channels::Id as ChannelId, messages::BroadcastMessage}, + error::BoxedError, + error::InternalError, + login::repo::logins::Login, +}; + +pub fn router() -> Router<App> { + Router::new().route("/events", get(on_events)) +} + +#[derive(serde::Deserialize)] +struct EventsQuery { + #[serde(default, rename = "channel")] + channels: Vec<ChannelId>, +} + +async fn on_events( + State(app): State<App>, + _: Login, // requires auth, but doesn't actually care who you are + last_event_id: Option<TypedHeader<LastEventId>>, + Query(query): Query<EventsQuery>, +) -> Result<impl IntoResponse, InternalError> { + let resume_at = last_event_id + .map(|TypedHeader(header)| header) + .map(|LastEventId(header)| header) + .map(|header| DateTime::parse_from_rfc3339(&header)) + .transpose()? + .map(|ts| ts.to_utc()); + + let streams = stream::iter(query.channels) + .then(|channel| { + let app = app.clone(); + async move { app.channels().events(&channel, resume_at.as_ref()).await } + }) + .try_collect::<Vec<_>>() + .await?; + + let stream = stream::select_all(streams).and_then(|msg| future::ready(to_event(msg))); + let sse = Sse::new(stream).keep_alive(sse::KeepAlive::default()); + + Ok(sse) +} + +fn to_event(msg: BroadcastMessage) -> Result<sse::Event, BoxedError> { + let data = serde_json::to_string(&msg)?; + let event = sse::Event::default() + .id(msg + .sent_at + .to_rfc3339_opts(SecondsFormat::AutoSi, /* use_z */ true)) + .data(&data); + + Ok(event) +} + +pub struct LastEventId(pub String); + +static LAST_EVENT_ID: HeaderName = HeaderName::from_static("last-event-id"); + +impl headers::Header for LastEventId { + fn name() -> &'static HeaderName { + &LAST_EVENT_ID + } + + fn decode<'i, I>(values: &mut I) -> Result<Self, headers::Error> + where + I: Iterator<Item = &'i HeaderValue>, + { + let value = values.next().ok_or_else(headers::Error::invalid)?; + if let Ok(value) = value.to_str() { + Ok(Self(value.into())) + } else { + Err(headers::Error::invalid()) + } + } + + fn encode<E>(&self, values: &mut E) + where + E: Extend<HeaderValue>, + { + let Self(value) = self; + // Must panic or suppress; the trait provides no other options. + let value = HeaderValue::from_str(value).expect("LastEventId is a valid header value"); + + values.extend(std::iter::once(value)); + } +} diff --git a/src/index/templates.rs b/src/index/templates.rs index 7472fd0..a69c19a 100644 --- a/src/index/templates.rs +++ b/src/index/templates.rs @@ -105,7 +105,8 @@ pub fn channel(channel: &Channel) -> Markup { span.sent_at { "(sent_at)" } ")" } } - link rel="events" href=(format!("/{}/events", channel.id)) {} + meta name="channel" content=(channel.id) {} + link rel="events" href=(format!("/events?channel={}", channel.id)) {} } body { section class="messages" {} @@ -3,6 +3,7 @@ mod channel; pub mod cli; mod clock; mod error; +mod events; mod id; mod index; mod login; |
