summaryrefslogtreecommitdiff
path: root/src/events.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/events.rs')
-rw-r--r--src/events.rs132
1 files changed, 0 insertions, 132 deletions
diff --git a/src/events.rs b/src/events.rs
deleted file mode 100644
index 9cbb0a3..0000000
--- a/src/events.rs
+++ /dev/null
@@ -1,132 +0,0 @@
-use axum::{
- extract::State,
- http::StatusCode,
- response::{
- sse::{self, Sse},
- IntoResponse, Response,
- },
- routing::get,
- Router,
-};
-use axum_extra::extract::Query;
-use chrono::{self, format::SecondsFormat, DateTime};
-use futures::stream::{self, Stream, StreamExt as _, TryStreamExt as _};
-
-use crate::{
- app::App,
- channel::{app::EventsError, repo::broadcast},
- clock::RequestedAt,
- error::InternalError,
- header::LastEventId,
- repo::{channel, login::Login},
-};
-
-pub fn router() -> Router<App> {
- Router::new().route("/api/events", get(on_events))
-}
-
-#[derive(serde::Deserialize)]
-struct EventsQuery {
- #[serde(default, rename = "channel")]
- channels: Vec<channel::Id>,
-}
-
-async fn on_events(
- State(app): State<App>,
- RequestedAt(now): RequestedAt,
- _: Login, // requires auth, but doesn't actually care who you are
- last_event_id: Option<LastEventId>,
- Query(query): Query<EventsQuery>,
-) -> Result<Events<impl Stream<Item = ChannelEvent<broadcast::Message>>>, ErrorResponse> {
- let resume_at = last_event_id
- .map(|LastEventId(header)| header)
- .map(|header| DateTime::parse_from_rfc3339(&header))
- .transpose()
- // impl From would take more code; this is used once.
- .map_err(ErrorResponse::LastEventIdError)?
- .map(|ts| ts.to_utc());
-
- let streams = stream::iter(query.channels)
- .then(|channel| {
- let app = app.clone();
- async move {
- let events = app
- .channels()
- .events(&channel, &now, resume_at.as_ref())
- .await?
- .map(ChannelEvent::wrap(channel));
-
- Ok::<_, EventsError>(events)
- }
- })
- .try_collect::<Vec<_>>()
- .await
- // impl From would take more code; this is used once.
- .map_err(ErrorResponse::EventsError)?;
-
- let stream = stream::select_all(streams);
-
- Ok(Events(stream))
-}
-
-struct Events<S>(S);
-
-impl<S> IntoResponse for Events<S>
-where
- S: Stream<Item = ChannelEvent<broadcast::Message>> + Send + 'static,
-{
- fn into_response(self) -> Response {
- let Self(stream) = self;
- let stream = stream.map(to_sse_event);
- Sse::new(stream)
- .keep_alive(sse::KeepAlive::default())
- .into_response()
- }
-}
-
-enum ErrorResponse {
- EventsError(EventsError),
- LastEventIdError(chrono::ParseError),
-}
-
-impl IntoResponse for ErrorResponse {
- fn into_response(self) -> Response {
- match self {
- Self::EventsError(not_found @ EventsError::ChannelNotFound(_)) => {
- (StatusCode::NOT_FOUND, not_found.to_string()).into_response()
- }
- Self::EventsError(other) => InternalError::from(other).into_response(),
- Self::LastEventIdError(other) => {
- (StatusCode::BAD_REQUEST, other.to_string()).into_response()
- }
- }
- }
-}
-
-fn to_sse_event(event: ChannelEvent<broadcast::Message>) -> Result<sse::Event, serde_json::Error> {
- let data = serde_json::to_string_pretty(&event)?;
- let event = sse::Event::default()
- .id(event
- .message
- .sent_at
- .to_rfc3339_opts(SecondsFormat::AutoSi, /* use_z */ true))
- .data(&data);
-
- Ok(event)
-}
-
-#[derive(serde::Serialize)]
-struct ChannelEvent<M> {
- channel: channel::Id,
- #[serde(flatten)]
- message: M,
-}
-
-impl<M> ChannelEvent<M> {
- fn wrap(channel: channel::Id) -> impl Fn(M) -> Self {
- move |message| Self {
- channel: channel.clone(),
- message,
- }
- }
-}