From 0a05491930fb34ce7c93c33ea0b7599360483fc7 Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Fri, 20 Sep 2024 23:01:18 -0400 Subject: Push events into a module structure consistent with the rest of the project. --- src/events.rs | 132 ---------------------------------------------------------- 1 file changed, 132 deletions(-) delete mode 100644 src/events.rs (limited to 'src/events.rs') diff --git a/src/events.rs b/src/events.rs deleted file mode 100644 index 9cbb0a3..0000000 --- a/src/events.rs +++ /dev/null @@ -1,132 +0,0 @@ -use axum::{ - extract::State, - http::StatusCode, - response::{ - sse::{self, Sse}, - IntoResponse, Response, - }, - routing::get, - Router, -}; -use axum_extra::extract::Query; -use chrono::{self, format::SecondsFormat, DateTime}; -use futures::stream::{self, Stream, StreamExt as _, TryStreamExt as _}; - -use crate::{ - app::App, - channel::{app::EventsError, repo::broadcast}, - clock::RequestedAt, - error::InternalError, - header::LastEventId, - repo::{channel, login::Login}, -}; - -pub fn router() -> Router { - Router::new().route("/api/events", get(on_events)) -} - -#[derive(serde::Deserialize)] -struct EventsQuery { - #[serde(default, rename = "channel")] - channels: Vec, -} - -async fn on_events( - State(app): State, - RequestedAt(now): RequestedAt, - _: Login, // requires auth, but doesn't actually care who you are - last_event_id: Option, - Query(query): Query, -) -> Result>>, ErrorResponse> { - let resume_at = last_event_id - .map(|LastEventId(header)| header) - .map(|header| DateTime::parse_from_rfc3339(&header)) - .transpose() - // impl From would take more code; this is used once. - .map_err(ErrorResponse::LastEventIdError)? - .map(|ts| ts.to_utc()); - - let streams = stream::iter(query.channels) - .then(|channel| { - let app = app.clone(); - async move { - let events = app - .channels() - .events(&channel, &now, resume_at.as_ref()) - .await? - .map(ChannelEvent::wrap(channel)); - - Ok::<_, EventsError>(events) - } - }) - .try_collect::>() - .await - // impl From would take more code; this is used once. - .map_err(ErrorResponse::EventsError)?; - - let stream = stream::select_all(streams); - - Ok(Events(stream)) -} - -struct Events(S); - -impl IntoResponse for Events -where - S: Stream> + Send + 'static, -{ - fn into_response(self) -> Response { - let Self(stream) = self; - let stream = stream.map(to_sse_event); - Sse::new(stream) - .keep_alive(sse::KeepAlive::default()) - .into_response() - } -} - -enum ErrorResponse { - EventsError(EventsError), - LastEventIdError(chrono::ParseError), -} - -impl IntoResponse for ErrorResponse { - fn into_response(self) -> Response { - match self { - Self::EventsError(not_found @ EventsError::ChannelNotFound(_)) => { - (StatusCode::NOT_FOUND, not_found.to_string()).into_response() - } - Self::EventsError(other) => InternalError::from(other).into_response(), - Self::LastEventIdError(other) => { - (StatusCode::BAD_REQUEST, other.to_string()).into_response() - } - } - } -} - -fn to_sse_event(event: ChannelEvent) -> Result { - let data = serde_json::to_string_pretty(&event)?; - let event = sse::Event::default() - .id(event - .message - .sent_at - .to_rfc3339_opts(SecondsFormat::AutoSi, /* use_z */ true)) - .data(&data); - - Ok(event) -} - -#[derive(serde::Serialize)] -struct ChannelEvent { - channel: channel::Id, - #[serde(flatten)] - message: M, -} - -impl ChannelEvent { - fn wrap(channel: channel::Id) -> impl Fn(M) -> Self { - move |message| Self { - channel: channel.clone(), - message, - } - } -} -- cgit v1.2.3