summaryrefslogtreecommitdiff
path: root/src/events/routes.rs
diff options
context:
space:
mode:
authorOwen Jacobson <owen@grimoire.ca>2024-10-02 00:41:25 -0400
committerOwen Jacobson <owen@grimoire.ca>2024-10-02 00:41:38 -0400
commit357116366c1307bedaac6a3dfe9c5ed8e0e0c210 (patch)
treed701378187d8b0f99d524991925e8348c6cab0d6 /src/events/routes.rs
parentf878f0b5eaa44e8ee8d67cbfd706926ff2119113 (diff)
First pass on reorganizing the backend.
This is primarily renames and repackagings.
Diffstat (limited to 'src/events/routes.rs')
-rw-r--r--src/events/routes.rs92
1 files changed, 0 insertions, 92 deletions
diff --git a/src/events/routes.rs b/src/events/routes.rs
deleted file mode 100644
index d81c7fb..0000000
--- a/src/events/routes.rs
+++ /dev/null
@@ -1,92 +0,0 @@
-use axum::{
- extract::State,
- response::{
- sse::{self, Sse},
- IntoResponse, Response,
- },
- routing::get,
- Router,
-};
-use axum_extra::extract::Query;
-use futures::stream::{Stream, StreamExt as _};
-
-use super::{extract::LastEventId, types};
-use crate::{
- app::App,
- error::{Internal, Unauthorized},
- login::{app::ValidateError, extract::Identity},
- repo::sequence::Sequence,
-};
-
-#[cfg(test)]
-mod test;
-
-pub fn router() -> Router<App> {
- Router::new().route("/api/events", get(events))
-}
-
-#[derive(Default, serde::Deserialize)]
-struct EventsQuery {
- resume_point: Option<Sequence>,
-}
-
-async fn events(
- State(app): State<App>,
- identity: Identity,
- last_event_id: Option<LastEventId<Sequence>>,
- Query(query): Query<EventsQuery>,
-) -> Result<Events<impl Stream<Item = types::ChannelEvent> + std::fmt::Debug>, EventsError> {
- let resume_at = last_event_id
- .map(LastEventId::into_inner)
- .or(query.resume_point);
-
- let stream = app.events().subscribe(resume_at).await?;
- let stream = app.logins().limit_stream(identity.token, stream).await?;
-
- Ok(Events(stream))
-}
-
-#[derive(Debug)]
-struct Events<S>(S);
-
-impl<S> IntoResponse for Events<S>
-where
- S: Stream<Item = types::ChannelEvent> + Send + 'static,
-{
- fn into_response(self) -> Response {
- let Self(stream) = self;
- let stream = stream.map(sse::Event::try_from);
- Sse::new(stream)
- .keep_alive(sse::KeepAlive::default())
- .into_response()
- }
-}
-
-impl TryFrom<types::ChannelEvent> for sse::Event {
- type Error = serde_json::Error;
-
- fn try_from(event: types::ChannelEvent) -> Result<Self, Self::Error> {
- let id = serde_json::to_string(&event.sequence)?;
- let data = serde_json::to_string_pretty(&event)?;
-
- let event = Self::default().id(id).data(data);
-
- Ok(event)
- }
-}
-
-#[derive(Debug, thiserror::Error)]
-#[error(transparent)]
-pub enum EventsError {
- DatabaseError(#[from] sqlx::Error),
- ValidateError(#[from] ValidateError),
-}
-
-impl IntoResponse for EventsError {
- fn into_response(self) -> Response {
- match self {
- Self::ValidateError(ValidateError::InvalidToken) => Unauthorized.into_response(),
- other => Internal::from(other).into_response(),
- }
- }
-}