summaryrefslogtreecommitdiff
path: root/src/events.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/events.rs')
-rw-r--r--src/events.rs105
1 files changed, 105 insertions, 0 deletions
diff --git a/src/events.rs b/src/events.rs
new file mode 100644
index 0000000..2d1e1f8
--- /dev/null
+++ b/src/events.rs
@@ -0,0 +1,105 @@
+use axum::{
+ extract::State,
+ http::{HeaderName, HeaderValue},
+ response::{
+ sse::{self, Sse},
+ IntoResponse,
+ },
+ routing::get,
+ Router,
+};
+use axum_extra::{extract::Query, typed_header::TypedHeader};
+use chrono::{format::SecondsFormat, DateTime};
+use futures::{
+ future,
+ stream::{self, StreamExt as _, TryStreamExt as _},
+};
+
+use crate::{
+ app::App,
+ channel::repo::{channels::Id as ChannelId, messages::BroadcastMessage},
+ error::BoxedError,
+ error::InternalError,
+ login::repo::logins::Login,
+};
+
+pub fn router() -> Router<App> {
+ Router::new().route("/events", get(on_events))
+}
+
+#[derive(serde::Deserialize)]
+struct EventsQuery {
+ #[serde(default, rename = "channel")]
+ channels: Vec<ChannelId>,
+}
+
+async fn on_events(
+ State(app): State<App>,
+ _: Login, // requires auth, but doesn't actually care who you are
+ last_event_id: Option<TypedHeader<LastEventId>>,
+ Query(query): Query<EventsQuery>,
+) -> Result<impl IntoResponse, InternalError> {
+ let resume_at = last_event_id
+ .map(|TypedHeader(header)| header)
+ .map(|LastEventId(header)| header)
+ .map(|header| DateTime::parse_from_rfc3339(&header))
+ .transpose()?
+ .map(|ts| ts.to_utc());
+
+ let streams = stream::iter(query.channels)
+ .then(|channel| {
+ let app = app.clone();
+ async move { app.channels().events(&channel, resume_at.as_ref()).await }
+ })
+ .try_collect::<Vec<_>>()
+ .await?;
+
+ let stream = stream::select_all(streams).and_then(|msg| future::ready(to_event(msg)));
+ let sse = Sse::new(stream).keep_alive(sse::KeepAlive::default());
+
+ Ok(sse)
+}
+
+fn to_event(msg: BroadcastMessage) -> Result<sse::Event, BoxedError> {
+ let data = serde_json::to_string(&msg)?;
+ let event = sse::Event::default()
+ .id(msg
+ .sent_at
+ .to_rfc3339_opts(SecondsFormat::AutoSi, /* use_z */ true))
+ .data(&data);
+
+ Ok(event)
+}
+
+pub struct LastEventId(pub String);
+
+static LAST_EVENT_ID: HeaderName = HeaderName::from_static("last-event-id");
+
+impl headers::Header for LastEventId {
+ fn name() -> &'static HeaderName {
+ &LAST_EVENT_ID
+ }
+
+ fn decode<'i, I>(values: &mut I) -> Result<Self, headers::Error>
+ where
+ I: Iterator<Item = &'i HeaderValue>,
+ {
+ let value = values.next().ok_or_else(headers::Error::invalid)?;
+ if let Ok(value) = value.to_str() {
+ Ok(Self(value.into()))
+ } else {
+ Err(headers::Error::invalid())
+ }
+ }
+
+ fn encode<E>(&self, values: &mut E)
+ where
+ E: Extend<HeaderValue>,
+ {
+ let Self(value) = self;
+ // Must panic or suppress; the trait provides no other options.
+ let value = HeaderValue::from_str(value).expect("LastEventId is a valid header value");
+
+ values.extend(std::iter::once(value));
+ }
+}