diff options
Diffstat (limited to 'src/events.rs')
| -rw-r--r-- | src/events.rs | 132 |
1 files changed, 0 insertions, 132 deletions
diff --git a/src/events.rs b/src/events.rs deleted file mode 100644 index 9cbb0a3..0000000 --- a/src/events.rs +++ /dev/null @@ -1,132 +0,0 @@ -use axum::{ - extract::State, - http::StatusCode, - response::{ - sse::{self, Sse}, - IntoResponse, Response, - }, - routing::get, - Router, -}; -use axum_extra::extract::Query; -use chrono::{self, format::SecondsFormat, DateTime}; -use futures::stream::{self, Stream, StreamExt as _, TryStreamExt as _}; - -use crate::{ - app::App, - channel::{app::EventsError, repo::broadcast}, - clock::RequestedAt, - error::InternalError, - header::LastEventId, - repo::{channel, login::Login}, -}; - -pub fn router() -> Router<App> { - Router::new().route("/api/events", get(on_events)) -} - -#[derive(serde::Deserialize)] -struct EventsQuery { - #[serde(default, rename = "channel")] - channels: Vec<channel::Id>, -} - -async fn on_events( - State(app): State<App>, - RequestedAt(now): RequestedAt, - _: Login, // requires auth, but doesn't actually care who you are - last_event_id: Option<LastEventId>, - Query(query): Query<EventsQuery>, -) -> Result<Events<impl Stream<Item = ChannelEvent<broadcast::Message>>>, ErrorResponse> { - let resume_at = last_event_id - .map(|LastEventId(header)| header) - .map(|header| DateTime::parse_from_rfc3339(&header)) - .transpose() - // impl From would take more code; this is used once. - .map_err(ErrorResponse::LastEventIdError)? - .map(|ts| ts.to_utc()); - - let streams = stream::iter(query.channels) - .then(|channel| { - let app = app.clone(); - async move { - let events = app - .channels() - .events(&channel, &now, resume_at.as_ref()) - .await? - .map(ChannelEvent::wrap(channel)); - - Ok::<_, EventsError>(events) - } - }) - .try_collect::<Vec<_>>() - .await - // impl From would take more code; this is used once. - .map_err(ErrorResponse::EventsError)?; - - let stream = stream::select_all(streams); - - Ok(Events(stream)) -} - -struct Events<S>(S); - -impl<S> IntoResponse for Events<S> -where - S: Stream<Item = ChannelEvent<broadcast::Message>> + Send + 'static, -{ - fn into_response(self) -> Response { - let Self(stream) = self; - let stream = stream.map(to_sse_event); - Sse::new(stream) - .keep_alive(sse::KeepAlive::default()) - .into_response() - } -} - -enum ErrorResponse { - EventsError(EventsError), - LastEventIdError(chrono::ParseError), -} - -impl IntoResponse for ErrorResponse { - fn into_response(self) -> Response { - match self { - Self::EventsError(not_found @ EventsError::ChannelNotFound(_)) => { - (StatusCode::NOT_FOUND, not_found.to_string()).into_response() - } - Self::EventsError(other) => InternalError::from(other).into_response(), - Self::LastEventIdError(other) => { - (StatusCode::BAD_REQUEST, other.to_string()).into_response() - } - } - } -} - -fn to_sse_event(event: ChannelEvent<broadcast::Message>) -> Result<sse::Event, serde_json::Error> { - let data = serde_json::to_string_pretty(&event)?; - let event = sse::Event::default() - .id(event - .message - .sent_at - .to_rfc3339_opts(SecondsFormat::AutoSi, /* use_z */ true)) - .data(&data); - - Ok(event) -} - -#[derive(serde::Serialize)] -struct ChannelEvent<M> { - channel: channel::Id, - #[serde(flatten)] - message: M, -} - -impl<M> ChannelEvent<M> { - fn wrap(channel: channel::Id) -> impl Fn(M) -> Self { - move |message| Self { - channel: channel.clone(), - message, - } - } -} |
