diff options
Diffstat (limited to 'src/events/routes.rs')
| -rw-r--r-- | src/events/routes.rs | 69 |
1 files changed, 0 insertions, 69 deletions
diff --git a/src/events/routes.rs b/src/events/routes.rs deleted file mode 100644 index ec9dae2..0000000 --- a/src/events/routes.rs +++ /dev/null @@ -1,69 +0,0 @@ -use axum::{ - extract::State, - response::{ - sse::{self, Sse}, - IntoResponse, Response, - }, - routing::get, - Router, -}; -use futures::stream::{Stream, StreamExt as _}; - -use super::{ - extract::LastEventId, - types::{self, ResumePoint}, -}; -use crate::{app::App, error::Internal, login::extract::Identity}; - -#[cfg(test)] -mod test; - -pub fn router() -> Router<App> { - Router::new().route("/api/events", get(events)) -} - -async fn events( - State(app): State<App>, - identity: Identity, - last_event_id: Option<LastEventId<ResumePoint>>, -) -> Result<Events<impl Stream<Item = types::ResumableEvent> + std::fmt::Debug>, Internal> { - let resume_at = last_event_id - .map(LastEventId::into_inner) - .unwrap_or_default(); - - let stream = app.events().subscribe(resume_at).await?; - let stream = app.logins().limit_stream(identity.token, stream); - - Ok(Events(stream)) -} - -#[derive(Debug)] -struct Events<S>(S); - -impl<S> IntoResponse for Events<S> -where - S: Stream<Item = types::ResumableEvent> + Send + 'static, -{ - fn into_response(self) -> Response { - let Self(stream) = self; - let stream = stream.map(sse::Event::try_from); - Sse::new(stream) - .keep_alive(sse::KeepAlive::default()) - .into_response() - } -} - -impl TryFrom<types::ResumableEvent> for sse::Event { - type Error = serde_json::Error; - - fn try_from(value: types::ResumableEvent) -> Result<Self, Self::Error> { - let types::ResumableEvent(resume_at, data) = value; - - let id = serde_json::to_string(&resume_at)?; - let data = serde_json::to_string_pretty(&data)?; - - let event = Self::default().id(id).data(data); - - Ok(event) - } -} |
