use axum::{ extract::State, response::{ self, IntoResponse, sse::{self, Sse}, }, }; use axum_extra::extract::Query; use futures::stream::{Stream, StreamExt as _}; use crate::{ app::App, error::{Internal, Unauthorized}, event::{Event, Heartbeat::Heartbeat, Sequence, Sequenced as _, app, extract::LastEventId}, token::{app::ValidateError, extract::Identity}, }; pub async fn handler( State(app): State, identity: Identity, last_event_id: Option>, Query(query): Query, ) -> Result + std::fmt::Debug>, Error> { let resume_at = last_event_id.map_or(query.resume_point, LastEventId::into_inner); let stream = app.events().subscribe(resume_at).await?; let stream = app.tokens().limit_stream(identity.token, stream).await?; Ok(Response(stream)) } #[derive(serde::Deserialize)] pub struct QueryParams { pub resume_point: Sequence, } #[derive(Debug)] pub struct Response(pub S); impl IntoResponse for Response where S: Stream + Send + 'static, { fn into_response(self) -> response::Response { let Self(stream) = self; let stream = stream.map(sse::Event::try_from); let heartbeat = match Heartbeat.try_into().map_err(Internal::from) { Ok(heartbeat) => heartbeat, Err(err) => return err.into_response(), }; Sse::new(stream).keep_alive(heartbeat).into_response() } } impl TryFrom for sse::Event { type Error = serde_json::Error; fn try_from(event: Event) -> Result { let id = serde_json::to_string(&event.sequence())?; let data = serde_json::to_string_pretty(&event)?; let event = Self::default().id(id).data(data); Ok(event) } } #[derive(Debug, thiserror::Error)] #[error(transparent)] pub enum Error { Subscribe(#[from] app::Error), Validate(#[from] ValidateError), } impl IntoResponse for Error { fn into_response(self) -> response::Response { match self { Self::Validate(ValidateError::InvalidToken) => Unauthorized.into_response(), other => Internal::from(other).into_response(), } } }