diff options
Diffstat (limited to 'src/channel')
| -rw-r--r-- | src/channel/app.rs | 5 | ||||
| -rw-r--r-- | src/channel/header.rs | 34 | ||||
| -rw-r--r-- | src/channel/mod.rs | 1 | ||||
| -rw-r--r-- | src/channel/repo/messages.rs | 19 | ||||
| -rw-r--r-- | src/channel/routes.rs | 54 |
5 files changed, 21 insertions, 92 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs index c0a6d60..5417a5e 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -4,7 +4,7 @@ use std::sync::{Arc, Mutex, MutexGuard}; use futures::{ future, stream::{self, StreamExt as _, TryStreamExt as _}, - TryStream, + Stream, }; use sqlx::sqlite::SqlitePool; use tokio::sync::broadcast::{channel, Sender}; @@ -57,7 +57,8 @@ impl<'a> Channels<'a> { &self, channel: &ChannelId, resume_at: Option<&DateTime>, - ) -> Result<impl TryStream<Ok = BroadcastMessage, Error = BoxedError>, BoxedError> { + ) -> Result<impl Stream<Item = Result<BroadcastMessage, BoxedError>> + 'static, BoxedError> + { fn skip_stale<E>( resume_at: Option<&DateTime>, ) -> impl for<'m> FnMut(&'m BroadcastMessage) -> future::Ready<Result<bool, E>> { diff --git a/src/channel/header.rs b/src/channel/header.rs deleted file mode 100644 index eda8214..0000000 --- a/src/channel/header.rs +++ /dev/null @@ -1,34 +0,0 @@ -use axum::http::{HeaderName, HeaderValue}; - -pub struct LastEventId(pub String); - -static LAST_EVENT_ID: HeaderName = HeaderName::from_static("last-event-id"); - -impl headers::Header for LastEventId { - fn name() -> &'static HeaderName { - &LAST_EVENT_ID - } - - fn decode<'i, I>(values: &mut I) -> Result<Self, headers::Error> - where - I: Iterator<Item = &'i HeaderValue>, - { - let value = values.next().ok_or_else(headers::Error::invalid)?; - if let Ok(value) = value.to_str() { - Ok(Self(value.into())) - } else { - Err(headers::Error::invalid()) - } - } - - fn encode<E>(&self, values: &mut E) - where - E: Extend<HeaderValue>, - { - let Self(value) = self; - // Must panic or suppress; the trait provides no other options. - let value = HeaderValue::from_str(value).expect("LastEventId is a valid header value"); - - values.extend(std::iter::once(value)); - } -} diff --git a/src/channel/mod.rs b/src/channel/mod.rs index bc2cc6c..f67ea04 100644 --- a/src/channel/mod.rs +++ b/src/channel/mod.rs @@ -1,5 +1,4 @@ pub mod app; -mod header; pub mod repo; mod routes; diff --git a/src/channel/repo/messages.rs b/src/channel/repo/messages.rs index b465f61..e15a02a 100644 --- a/src/channel/repo/messages.rs +++ b/src/channel/repo/messages.rs @@ -26,6 +26,7 @@ pub struct Messages<'t>(&'t mut SqliteConnection); pub struct BroadcastMessage { pub id: Id, pub sender: Login, + pub channel: ChannelId, pub body: String, pub sent_at: DateTime, } @@ -49,6 +50,8 @@ impl<'c> Messages<'c> { values ($1, $2, $3, $4, $5) returning id as "id: Id", + sender as "sender: LoginId", + channel as "channel: ChannelId", body, sent_at as "sent_at: DateTime" "#, @@ -58,11 +61,15 @@ impl<'c> Messages<'c> { body, sent_at, ) - .map(|row| BroadcastMessage { - sender: sender.clone(), - id: row.id, - body: row.body, - sent_at: row.sent_at, + .map(|row| { + debug_assert!(row.sender == sender.id); + BroadcastMessage { + id: row.id, + sender: sender.clone(), + channel: row.channel, + body: row.body, + sent_at: row.sent_at, + } }) .fetch_one(&mut *self.0) .await?; @@ -81,6 +88,7 @@ impl<'c> Messages<'c> { message.id as "id: Id", login.id as "sender_id: LoginId", login.name as sender_name, + message.channel as "channel: ChannelId", message.body, message.sent_at as "sent_at: DateTime" from message @@ -98,6 +106,7 @@ impl<'c> Messages<'c> { id: row.sender_id, name: row.sender_name, }, + channel: row.channel, body: row.body, sent_at: row.sent_at, }) diff --git a/src/channel/routes.rs b/src/channel/routes.rs index eae68a2..3c2353b 100644 --- a/src/channel/routes.rs +++ b/src/channel/routes.rs @@ -1,30 +1,17 @@ use axum::{ extract::{Form, Path, State}, - response::{ - sse::{self, Sse}, - IntoResponse, Redirect, - }, - routing::{get, post}, + response::{IntoResponse, Redirect}, + routing::post, Router, }; -use axum_extra::TypedHeader; -use chrono::{format::SecondsFormat, DateTime}; -use futures::{future, stream::TryStreamExt as _}; -use super::{ - header::LastEventId, - repo::{channels::Id as ChannelId, messages::BroadcastMessage}, -}; -use crate::{ - app::App, clock::RequestedAt, error::BoxedError, error::InternalError, - login::repo::logins::Login, -}; +use super::repo::channels::Id as ChannelId; +use crate::{app::App, clock::RequestedAt, error::InternalError, login::repo::logins::Login}; pub fn router() -> Router<App> { Router::new() .route("/create", post(on_create)) .route("/:channel/send", post(on_send)) - .route("/:channel/events", get(on_events)) } #[derive(serde::Deserialize)] @@ -60,36 +47,3 @@ async fn on_send( Ok(Redirect::to(&format!("/{}", channel))) } - -async fn on_events( - Path(channel): Path<ChannelId>, - State(app): State<App>, - _: Login, // requires auth, but doesn't actually care who you are - last_event_id: Option<TypedHeader<LastEventId>>, -) -> Result<impl IntoResponse, InternalError> { - let resume_at = last_event_id - .map(|TypedHeader(header)| header) - .map(|LastEventId(header)| header) - .map(|header| DateTime::parse_from_rfc3339(&header)) - .transpose()? - .map(|ts| ts.to_utc()); - - let stream = app - .channels() - .events(&channel, resume_at.as_ref()) - .await? - .and_then(|msg| future::ready(to_event(msg))); - - Ok(Sse::new(stream).keep_alive(sse::KeepAlive::default())) -} - -fn to_event(msg: BroadcastMessage) -> Result<sse::Event, BoxedError> { - let data = serde_json::to_string(&msg)?; - let event = sse::Event::default() - .id(msg - .sent_at - .to_rfc3339_opts(SecondsFormat::AutoSi, /* use_z */ true)) - .data(&data); - - Ok(event) -} |
