summaryrefslogtreecommitdiff
path: root/src/event/handlers/stream/mod.rs
diff options
context:
space:
mode:
authorOwen Jacobson <owen@grimoire.ca>2025-06-17 02:11:45 -0400
committerOwen Jacobson <owen@grimoire.ca>2025-06-18 18:31:40 -0400
commit4e3d5ccac99b24934c972e088cd7eb02bb95df06 (patch)
treec94f5a42f7e734b81892c1289a1d2b566706ba7c /src/event/handlers/stream/mod.rs
parent5ed96f8e8b9d9f19ee249f5c73a5a21ef6bca09f (diff)
Handlers are _named operations_, which can be exposed via routes.
Each domain module that exposes handlers does so through a `handlers` child module, ideally as a top-level symbol that can be plugged directly into Axum's `MethodRouter`. Modules could make exceptions to this - kill the doctrinaire inside yourself, after all - but none of the API modules that actually exist need such exceptions, and consistency is useful. The related details of request types, URL types, response types, errors, &c &c are then organized into modules under `handlers`, along with their respective tests.
Diffstat (limited to 'src/event/handlers/stream/mod.rs')
-rw-r--r--src/event/handlers/stream/mod.rs85
1 files changed, 85 insertions, 0 deletions
diff --git a/src/event/handlers/stream/mod.rs b/src/event/handlers/stream/mod.rs
new file mode 100644
index 0000000..d0d3f08
--- /dev/null
+++ b/src/event/handlers/stream/mod.rs
@@ -0,0 +1,85 @@
+use axum::{
+ extract::State,
+ response::{
+ self, IntoResponse,
+ sse::{self, Sse},
+ },
+};
+use axum_extra::extract::Query;
+use futures::stream::{Stream, StreamExt as _};
+
+use crate::{
+ app::App,
+ error::{Internal, Unauthorized},
+ event::{Event, Heartbeat::Heartbeat, Sequence, Sequenced as _, app, extract::LastEventId},
+ token::{app::ValidateError, extract::Identity},
+};
+
+#[cfg(test)]
+mod test;
+
+pub async fn handler(
+ State(app): State<App>,
+ identity: Identity,
+ last_event_id: Option<LastEventId<Sequence>>,
+ Query(query): Query<QueryParams>,
+) -> Result<Response<impl Stream<Item = Event> + std::fmt::Debug>, Error> {
+ let resume_at = last_event_id.map_or(query.resume_point, LastEventId::into_inner);
+
+ let stream = app.events().subscribe(resume_at).await?;
+ let stream = app.tokens().limit_stream(identity.token, stream).await?;
+
+ Ok(Response(stream))
+}
+
+#[derive(serde::Deserialize)]
+pub struct QueryParams {
+ pub resume_point: Sequence,
+}
+
+#[derive(Debug)]
+pub struct Response<S>(pub S);
+
+impl<S> IntoResponse for Response<S>
+where
+ S: Stream<Item = Event> + Send + 'static,
+{
+ fn into_response(self) -> response::Response {
+ let Self(stream) = self;
+ let stream = stream.map(sse::Event::try_from);
+ let heartbeat = match Heartbeat.try_into().map_err(Internal::from) {
+ Ok(heartbeat) => heartbeat,
+ Err(err) => return err.into_response(),
+ };
+ Sse::new(stream).keep_alive(heartbeat).into_response()
+ }
+}
+
+impl TryFrom<Event> for sse::Event {
+ type Error = serde_json::Error;
+
+ fn try_from(event: Event) -> Result<Self, Self::Error> {
+ let id = serde_json::to_string(&event.sequence())?;
+ let data = serde_json::to_string_pretty(&event)?;
+
+ let event = Self::default().id(id).data(data);
+
+ Ok(event)
+ }
+}
+
+#[derive(Debug, thiserror::Error)]
+#[error(transparent)]
+pub enum Error {
+ Subscribe(#[from] app::Error),
+ Validate(#[from] ValidateError),
+}
+
+impl IntoResponse for Error {
+ fn into_response(self) -> response::Response {
+ match self {
+ Self::Validate(ValidateError::InvalidToken) => Unauthorized.into_response(),
+ other => Internal::from(other).into_response(),
+ }
+ }
+}