summaryrefslogtreecommitdiff
path: root/src/event/routes.rs
diff options
context:
space:
mode:
authorOwen Jacobson <owen@grimoire.ca>2024-10-16 20:14:33 -0400
committerOwen Jacobson <owen@grimoire.ca>2024-10-16 20:14:33 -0400
commitea74daca4809e4008dd8d01039db9fff3be659d9 (patch)
tree5972cabf646e8d5e635e9e2a176bff56c178461a /src/event/routes.rs
parent56e16e29db55dae84549229d24b971f8bcf7da21 (diff)
Organizational pass on endpoints and routes.
Diffstat (limited to 'src/event/routes.rs')
-rw-r--r--src/event/routes.rs92
1 files changed, 0 insertions, 92 deletions
diff --git a/src/event/routes.rs b/src/event/routes.rs
deleted file mode 100644
index de6d248..0000000
--- a/src/event/routes.rs
+++ /dev/null
@@ -1,92 +0,0 @@
-use axum::{
- extract::State,
- response::{
- sse::{self, Sse},
- IntoResponse, Response,
- },
- routing::get,
- Router,
-};
-use axum_extra::extract::Query;
-use futures::stream::{Stream, StreamExt as _};
-
-use super::{extract::LastEventId, Event};
-use crate::{
- app::App,
- error::{Internal, Unauthorized},
- event::{ResumePoint, Sequence, Sequenced as _},
- token::{app::ValidateError, extract::Identity},
-};
-
-#[cfg(test)]
-mod test;
-
-pub fn router() -> Router<App> {
- Router::new().route("/api/events", get(events))
-}
-
-#[derive(Default, serde::Deserialize)]
-struct EventsQuery {
- resume_point: ResumePoint,
-}
-
-async fn events(
- State(app): State<App>,
- identity: Identity,
- last_event_id: Option<LastEventId<Sequence>>,
- Query(query): Query<EventsQuery>,
-) -> Result<Events<impl Stream<Item = Event> + std::fmt::Debug>, EventsError> {
- let resume_at = last_event_id
- .map(LastEventId::into_inner)
- .or(query.resume_point);
-
- let stream = app.events().subscribe(resume_at).await?;
- let stream = app.tokens().limit_stream(identity.token, stream).await?;
-
- Ok(Events(stream))
-}
-
-#[derive(Debug)]
-struct Events<S>(S);
-
-impl<S> IntoResponse for Events<S>
-where
- S: Stream<Item = Event> + Send + 'static,
-{
- fn into_response(self) -> Response {
- let Self(stream) = self;
- let stream = stream.map(sse::Event::try_from);
- Sse::new(stream)
- .keep_alive(sse::KeepAlive::default())
- .into_response()
- }
-}
-
-impl TryFrom<Event> for sse::Event {
- type Error = serde_json::Error;
-
- fn try_from(event: Event) -> Result<Self, Self::Error> {
- let id = serde_json::to_string(&event.sequence())?;
- let data = serde_json::to_string_pretty(&event)?;
-
- let event = Self::default().id(id).data(data);
-
- Ok(event)
- }
-}
-
-#[derive(Debug, thiserror::Error)]
-#[error(transparent)]
-pub enum EventsError {
- DatabaseError(#[from] sqlx::Error),
- ValidateError(#[from] ValidateError),
-}
-
-impl IntoResponse for EventsError {
- fn into_response(self) -> Response {
- match self {
- Self::ValidateError(ValidateError::InvalidToken) => Unauthorized.into_response(),
- other => Internal::from(other).into_response(),
- }
- }
-}