diff options
| author | Owen Jacobson <owen@grimoire.ca> | 2025-06-17 02:11:45 -0400 |
|---|---|---|
| committer | Owen Jacobson <owen@grimoire.ca> | 2025-06-18 18:31:40 -0400 |
| commit | 4e3d5ccac99b24934c972e088cd7eb02bb95df06 (patch) | |
| tree | c94f5a42f7e734b81892c1289a1d2b566706ba7c /src/event/routes/get.rs | |
| parent | 5ed96f8e8b9d9f19ee249f5c73a5a21ef6bca09f (diff) | |
Handlers are _named operations_, which can be exposed via routes.
Each domain module that exposes handlers does so through a `handlers` child module, ideally as a top-level symbol that can be plugged directly into Axum's `MethodRouter`. Modules could make exceptions to this - kill the doctrinaire inside yourself, after all - but none of the API modules that actually exist need such exceptions, and consistency is useful.
The related details of request types, URL types, response types, errors, &c &c are then organized into modules under `handlers`, along with their respective tests.
Diffstat (limited to 'src/event/routes/get.rs')
| -rw-r--r-- | src/event/routes/get.rs | 82 |
1 files changed, 0 insertions, 82 deletions
diff --git a/src/event/routes/get.rs b/src/event/routes/get.rs deleted file mode 100644 index f6c91fa..0000000 --- a/src/event/routes/get.rs +++ /dev/null @@ -1,82 +0,0 @@ -use axum::{ - extract::State, - response::{ - self, IntoResponse, - sse::{self, Sse}, - }, -}; -use axum_extra::extract::Query; -use futures::stream::{Stream, StreamExt as _}; - -use crate::{ - app::App, - error::{Internal, Unauthorized}, - event::{Event, Heartbeat::Heartbeat, Sequence, Sequenced as _, app, extract::LastEventId}, - token::{app::ValidateError, extract::Identity}, -}; - -pub async fn handler( - State(app): State<App>, - identity: Identity, - last_event_id: Option<LastEventId<Sequence>>, - Query(query): Query<QueryParams>, -) -> Result<Response<impl Stream<Item = Event> + std::fmt::Debug>, Error> { - let resume_at = last_event_id.map_or(query.resume_point, LastEventId::into_inner); - - let stream = app.events().subscribe(resume_at).await?; - let stream = app.tokens().limit_stream(identity.token, stream).await?; - - Ok(Response(stream)) -} - -#[derive(serde::Deserialize)] -pub struct QueryParams { - pub resume_point: Sequence, -} - -#[derive(Debug)] -pub struct Response<S>(pub S); - -impl<S> IntoResponse for Response<S> -where - S: Stream<Item = Event> + Send + 'static, -{ - fn into_response(self) -> response::Response { - let Self(stream) = self; - let stream = stream.map(sse::Event::try_from); - let heartbeat = match Heartbeat.try_into().map_err(Internal::from) { - Ok(heartbeat) => heartbeat, - Err(err) => return err.into_response(), - }; - Sse::new(stream).keep_alive(heartbeat).into_response() - } -} - -impl TryFrom<Event> for sse::Event { - type Error = serde_json::Error; - - fn try_from(event: Event) -> Result<Self, Self::Error> { - let id = serde_json::to_string(&event.sequence())?; - let data = serde_json::to_string_pretty(&event)?; - - let event = Self::default().id(id).data(data); - - Ok(event) - } -} - -#[derive(Debug, thiserror::Error)] -#[error(transparent)] -pub enum Error { - Subscribe(#[from] app::Error), - Validate(#[from] ValidateError), -} - -impl IntoResponse for Error { - fn into_response(self) -> response::Response { - match self { - Self::Validate(ValidateError::InvalidToken) => Unauthorized.into_response(), - other => Internal::from(other).into_response(), - } - } -} |
