diff options
| author | Owen Jacobson <owen@grimoire.ca> | 2024-10-16 20:14:33 -0400 |
|---|---|---|
| committer | Owen Jacobson <owen@grimoire.ca> | 2024-10-16 20:14:33 -0400 |
| commit | ea74daca4809e4008dd8d01039db9fff3be659d9 (patch) | |
| tree | 5972cabf646e8d5e635e9e2a176bff56c178461a /src/event/routes.rs | |
| parent | 56e16e29db55dae84549229d24b971f8bcf7da21 (diff) | |
Organizational pass on endpoints and routes.
Diffstat (limited to 'src/event/routes.rs')
| -rw-r--r-- | src/event/routes.rs | 92 |
1 files changed, 0 insertions, 92 deletions
diff --git a/src/event/routes.rs b/src/event/routes.rs deleted file mode 100644 index de6d248..0000000 --- a/src/event/routes.rs +++ /dev/null @@ -1,92 +0,0 @@ -use axum::{ - extract::State, - response::{ - sse::{self, Sse}, - IntoResponse, Response, - }, - routing::get, - Router, -}; -use axum_extra::extract::Query; -use futures::stream::{Stream, StreamExt as _}; - -use super::{extract::LastEventId, Event}; -use crate::{ - app::App, - error::{Internal, Unauthorized}, - event::{ResumePoint, Sequence, Sequenced as _}, - token::{app::ValidateError, extract::Identity}, -}; - -#[cfg(test)] -mod test; - -pub fn router() -> Router<App> { - Router::new().route("/api/events", get(events)) -} - -#[derive(Default, serde::Deserialize)] -struct EventsQuery { - resume_point: ResumePoint, -} - -async fn events( - State(app): State<App>, - identity: Identity, - last_event_id: Option<LastEventId<Sequence>>, - Query(query): Query<EventsQuery>, -) -> Result<Events<impl Stream<Item = Event> + std::fmt::Debug>, EventsError> { - let resume_at = last_event_id - .map(LastEventId::into_inner) - .or(query.resume_point); - - let stream = app.events().subscribe(resume_at).await?; - let stream = app.tokens().limit_stream(identity.token, stream).await?; - - Ok(Events(stream)) -} - -#[derive(Debug)] -struct Events<S>(S); - -impl<S> IntoResponse for Events<S> -where - S: Stream<Item = Event> + Send + 'static, -{ - fn into_response(self) -> Response { - let Self(stream) = self; - let stream = stream.map(sse::Event::try_from); - Sse::new(stream) - .keep_alive(sse::KeepAlive::default()) - .into_response() - } -} - -impl TryFrom<Event> for sse::Event { - type Error = serde_json::Error; - - fn try_from(event: Event) -> Result<Self, Self::Error> { - let id = serde_json::to_string(&event.sequence())?; - let data = serde_json::to_string_pretty(&event)?; - - let event = Self::default().id(id).data(data); - - Ok(event) - } -} - -#[derive(Debug, thiserror::Error)] -#[error(transparent)] -pub enum EventsError { - DatabaseError(#[from] sqlx::Error), - ValidateError(#[from] ValidateError), -} - -impl IntoResponse for EventsError { - fn into_response(self) -> Response { - match self { - Self::ValidateError(ValidateError::InvalidToken) => Unauthorized.into_response(), - other => Internal::from(other).into_response(), - } - } -} |
