summaryrefslogtreecommitdiff
path: root/src/event/routes/get.rs
diff options
context:
space:
mode:
authorOwen Jacobson <owen@grimoire.ca>2025-06-17 02:11:45 -0400
committerOwen Jacobson <owen@grimoire.ca>2025-06-18 18:31:40 -0400
commit4e3d5ccac99b24934c972e088cd7eb02bb95df06 (patch)
treec94f5a42f7e734b81892c1289a1d2b566706ba7c /src/event/routes/get.rs
parent5ed96f8e8b9d9f19ee249f5c73a5a21ef6bca09f (diff)
Handlers are _named operations_, which can be exposed via routes.
Each domain module that exposes handlers does so through a `handlers` child module, ideally as a top-level symbol that can be plugged directly into Axum's `MethodRouter`. Modules could make exceptions to this - kill the doctrinaire inside yourself, after all - but none of the API modules that actually exist need such exceptions, and consistency is useful. The related details of request types, URL types, response types, errors, &c &c are then organized into modules under `handlers`, along with their respective tests.
Diffstat (limited to 'src/event/routes/get.rs')
-rw-r--r--src/event/routes/get.rs82
1 files changed, 0 insertions, 82 deletions
diff --git a/src/event/routes/get.rs b/src/event/routes/get.rs
deleted file mode 100644
index f6c91fa..0000000
--- a/src/event/routes/get.rs
+++ /dev/null
@@ -1,82 +0,0 @@
-use axum::{
- extract::State,
- response::{
- self, IntoResponse,
- sse::{self, Sse},
- },
-};
-use axum_extra::extract::Query;
-use futures::stream::{Stream, StreamExt as _};
-
-use crate::{
- app::App,
- error::{Internal, Unauthorized},
- event::{Event, Heartbeat::Heartbeat, Sequence, Sequenced as _, app, extract::LastEventId},
- token::{app::ValidateError, extract::Identity},
-};
-
-pub async fn handler(
- State(app): State<App>,
- identity: Identity,
- last_event_id: Option<LastEventId<Sequence>>,
- Query(query): Query<QueryParams>,
-) -> Result<Response<impl Stream<Item = Event> + std::fmt::Debug>, Error> {
- let resume_at = last_event_id.map_or(query.resume_point, LastEventId::into_inner);
-
- let stream = app.events().subscribe(resume_at).await?;
- let stream = app.tokens().limit_stream(identity.token, stream).await?;
-
- Ok(Response(stream))
-}
-
-#[derive(serde::Deserialize)]
-pub struct QueryParams {
- pub resume_point: Sequence,
-}
-
-#[derive(Debug)]
-pub struct Response<S>(pub S);
-
-impl<S> IntoResponse for Response<S>
-where
- S: Stream<Item = Event> + Send + 'static,
-{
- fn into_response(self) -> response::Response {
- let Self(stream) = self;
- let stream = stream.map(sse::Event::try_from);
- let heartbeat = match Heartbeat.try_into().map_err(Internal::from) {
- Ok(heartbeat) => heartbeat,
- Err(err) => return err.into_response(),
- };
- Sse::new(stream).keep_alive(heartbeat).into_response()
- }
-}
-
-impl TryFrom<Event> for sse::Event {
- type Error = serde_json::Error;
-
- fn try_from(event: Event) -> Result<Self, Self::Error> {
- let id = serde_json::to_string(&event.sequence())?;
- let data = serde_json::to_string_pretty(&event)?;
-
- let event = Self::default().id(id).data(data);
-
- Ok(event)
- }
-}
-
-#[derive(Debug, thiserror::Error)]
-#[error(transparent)]
-pub enum Error {
- Subscribe(#[from] app::Error),
- Validate(#[from] ValidateError),
-}
-
-impl IntoResponse for Error {
- fn into_response(self) -> response::Response {
- match self {
- Self::Validate(ValidateError::InvalidToken) => Unauthorized.into_response(),
- other => Internal::from(other).into_response(),
- }
- }
-}