summaryrefslogtreecommitdiff
path: root/src/channel
diff options
context:
space:
mode:
Diffstat (limited to 'src/channel')
-rw-r--r--src/channel/app.rs5
-rw-r--r--src/channel/header.rs34
-rw-r--r--src/channel/mod.rs1
-rw-r--r--src/channel/repo/messages.rs19
-rw-r--r--src/channel/routes.rs54
5 files changed, 21 insertions, 92 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs
index c0a6d60..5417a5e 100644
--- a/src/channel/app.rs
+++ b/src/channel/app.rs
@@ -4,7 +4,7 @@ use std::sync::{Arc, Mutex, MutexGuard};
use futures::{
future,
stream::{self, StreamExt as _, TryStreamExt as _},
- TryStream,
+ Stream,
};
use sqlx::sqlite::SqlitePool;
use tokio::sync::broadcast::{channel, Sender};
@@ -57,7 +57,8 @@ impl<'a> Channels<'a> {
&self,
channel: &ChannelId,
resume_at: Option<&DateTime>,
- ) -> Result<impl TryStream<Ok = BroadcastMessage, Error = BoxedError>, BoxedError> {
+ ) -> Result<impl Stream<Item = Result<BroadcastMessage, BoxedError>> + 'static, BoxedError>
+ {
fn skip_stale<E>(
resume_at: Option<&DateTime>,
) -> impl for<'m> FnMut(&'m BroadcastMessage) -> future::Ready<Result<bool, E>> {
diff --git a/src/channel/header.rs b/src/channel/header.rs
deleted file mode 100644
index eda8214..0000000
--- a/src/channel/header.rs
+++ /dev/null
@@ -1,34 +0,0 @@
-use axum::http::{HeaderName, HeaderValue};
-
-pub struct LastEventId(pub String);
-
-static LAST_EVENT_ID: HeaderName = HeaderName::from_static("last-event-id");
-
-impl headers::Header for LastEventId {
- fn name() -> &'static HeaderName {
- &LAST_EVENT_ID
- }
-
- fn decode<'i, I>(values: &mut I) -> Result<Self, headers::Error>
- where
- I: Iterator<Item = &'i HeaderValue>,
- {
- let value = values.next().ok_or_else(headers::Error::invalid)?;
- if let Ok(value) = value.to_str() {
- Ok(Self(value.into()))
- } else {
- Err(headers::Error::invalid())
- }
- }
-
- fn encode<E>(&self, values: &mut E)
- where
- E: Extend<HeaderValue>,
- {
- let Self(value) = self;
- // Must panic or suppress; the trait provides no other options.
- let value = HeaderValue::from_str(value).expect("LastEventId is a valid header value");
-
- values.extend(std::iter::once(value));
- }
-}
diff --git a/src/channel/mod.rs b/src/channel/mod.rs
index bc2cc6c..f67ea04 100644
--- a/src/channel/mod.rs
+++ b/src/channel/mod.rs
@@ -1,5 +1,4 @@
pub mod app;
-mod header;
pub mod repo;
mod routes;
diff --git a/src/channel/repo/messages.rs b/src/channel/repo/messages.rs
index b465f61..e15a02a 100644
--- a/src/channel/repo/messages.rs
+++ b/src/channel/repo/messages.rs
@@ -26,6 +26,7 @@ pub struct Messages<'t>(&'t mut SqliteConnection);
pub struct BroadcastMessage {
pub id: Id,
pub sender: Login,
+ pub channel: ChannelId,
pub body: String,
pub sent_at: DateTime,
}
@@ -49,6 +50,8 @@ impl<'c> Messages<'c> {
values ($1, $2, $3, $4, $5)
returning
id as "id: Id",
+ sender as "sender: LoginId",
+ channel as "channel: ChannelId",
body,
sent_at as "sent_at: DateTime"
"#,
@@ -58,11 +61,15 @@ impl<'c> Messages<'c> {
body,
sent_at,
)
- .map(|row| BroadcastMessage {
- sender: sender.clone(),
- id: row.id,
- body: row.body,
- sent_at: row.sent_at,
+ .map(|row| {
+ debug_assert!(row.sender == sender.id);
+ BroadcastMessage {
+ id: row.id,
+ sender: sender.clone(),
+ channel: row.channel,
+ body: row.body,
+ sent_at: row.sent_at,
+ }
})
.fetch_one(&mut *self.0)
.await?;
@@ -81,6 +88,7 @@ impl<'c> Messages<'c> {
message.id as "id: Id",
login.id as "sender_id: LoginId",
login.name as sender_name,
+ message.channel as "channel: ChannelId",
message.body,
message.sent_at as "sent_at: DateTime"
from message
@@ -98,6 +106,7 @@ impl<'c> Messages<'c> {
id: row.sender_id,
name: row.sender_name,
},
+ channel: row.channel,
body: row.body,
sent_at: row.sent_at,
})
diff --git a/src/channel/routes.rs b/src/channel/routes.rs
index eae68a2..3c2353b 100644
--- a/src/channel/routes.rs
+++ b/src/channel/routes.rs
@@ -1,30 +1,17 @@
use axum::{
extract::{Form, Path, State},
- response::{
- sse::{self, Sse},
- IntoResponse, Redirect,
- },
- routing::{get, post},
+ response::{IntoResponse, Redirect},
+ routing::post,
Router,
};
-use axum_extra::TypedHeader;
-use chrono::{format::SecondsFormat, DateTime};
-use futures::{future, stream::TryStreamExt as _};
-use super::{
- header::LastEventId,
- repo::{channels::Id as ChannelId, messages::BroadcastMessage},
-};
-use crate::{
- app::App, clock::RequestedAt, error::BoxedError, error::InternalError,
- login::repo::logins::Login,
-};
+use super::repo::channels::Id as ChannelId;
+use crate::{app::App, clock::RequestedAt, error::InternalError, login::repo::logins::Login};
pub fn router() -> Router<App> {
Router::new()
.route("/create", post(on_create))
.route("/:channel/send", post(on_send))
- .route("/:channel/events", get(on_events))
}
#[derive(serde::Deserialize)]
@@ -60,36 +47,3 @@ async fn on_send(
Ok(Redirect::to(&format!("/{}", channel)))
}
-
-async fn on_events(
- Path(channel): Path<ChannelId>,
- State(app): State<App>,
- _: Login, // requires auth, but doesn't actually care who you are
- last_event_id: Option<TypedHeader<LastEventId>>,
-) -> Result<impl IntoResponse, InternalError> {
- let resume_at = last_event_id
- .map(|TypedHeader(header)| header)
- .map(|LastEventId(header)| header)
- .map(|header| DateTime::parse_from_rfc3339(&header))
- .transpose()?
- .map(|ts| ts.to_utc());
-
- let stream = app
- .channels()
- .events(&channel, resume_at.as_ref())
- .await?
- .and_then(|msg| future::ready(to_event(msg)));
-
- Ok(Sse::new(stream).keep_alive(sse::KeepAlive::default()))
-}
-
-fn to_event(msg: BroadcastMessage) -> Result<sse::Event, BoxedError> {
- let data = serde_json::to_string(&msg)?;
- let event = sse::Event::default()
- .id(msg
- .sent_at
- .to_rfc3339_opts(SecondsFormat::AutoSi, /* use_z */ true))
- .data(&data);
-
- Ok(event)
-}