use axum::{ extract::State, response::{ sse::{self, Sse}, IntoResponse, Response, }, routing::get, Router, }; use futures::stream::{Stream, StreamExt as _}; use super::{ extract::LastEventId, types::{self, ResumePoint}, }; use crate::{app::App, error::Internal, login::extract::Identity}; #[cfg(test)] mod test; pub fn router() -> Router { Router::new().route("/api/events", get(events)) } async fn events( State(app): State, identity: Identity, last_event_id: Option>, ) -> Result + std::fmt::Debug>, Internal> { let resume_at = last_event_id .map(LastEventId::into_inner) .unwrap_or_default(); let stream = app.events().subscribe(resume_at).await?; let stream = app.logins().limit_stream(identity.token, stream); Ok(Events(stream)) } #[derive(Debug)] struct Events(S); impl IntoResponse for Events where S: Stream + Send + 'static, { fn into_response(self) -> Response { let Self(stream) = self; let stream = stream.map(sse::Event::try_from); Sse::new(stream) .keep_alive(sse::KeepAlive::default()) .into_response() } } impl TryFrom for sse::Event { type Error = serde_json::Error; fn try_from(value: types::ResumableEvent) -> Result { let types::ResumableEvent(resume_at, data) = value; let id = serde_json::to_string(&resume_at)?; let data = serde_json::to_string_pretty(&data)?; let event = Self::default().id(id).data(data); Ok(event) } }