summaryrefslogtreecommitdiff
path: root/src/events/routes.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/events/routes.rs')
-rw-r--r--src/events/routes.rs69
1 files changed, 0 insertions, 69 deletions
diff --git a/src/events/routes.rs b/src/events/routes.rs
deleted file mode 100644
index ec9dae2..0000000
--- a/src/events/routes.rs
+++ /dev/null
@@ -1,69 +0,0 @@
-use axum::{
- extract::State,
- response::{
- sse::{self, Sse},
- IntoResponse, Response,
- },
- routing::get,
- Router,
-};
-use futures::stream::{Stream, StreamExt as _};
-
-use super::{
- extract::LastEventId,
- types::{self, ResumePoint},
-};
-use crate::{app::App, error::Internal, login::extract::Identity};
-
-#[cfg(test)]
-mod test;
-
-pub fn router() -> Router<App> {
- Router::new().route("/api/events", get(events))
-}
-
-async fn events(
- State(app): State<App>,
- identity: Identity,
- last_event_id: Option<LastEventId<ResumePoint>>,
-) -> Result<Events<impl Stream<Item = types::ResumableEvent> + std::fmt::Debug>, Internal> {
- let resume_at = last_event_id
- .map(LastEventId::into_inner)
- .unwrap_or_default();
-
- let stream = app.events().subscribe(resume_at).await?;
- let stream = app.logins().limit_stream(identity.token, stream);
-
- Ok(Events(stream))
-}
-
-#[derive(Debug)]
-struct Events<S>(S);
-
-impl<S> IntoResponse for Events<S>
-where
- S: Stream<Item = types::ResumableEvent> + Send + 'static,
-{
- fn into_response(self) -> Response {
- let Self(stream) = self;
- let stream = stream.map(sse::Event::try_from);
- Sse::new(stream)
- .keep_alive(sse::KeepAlive::default())
- .into_response()
- }
-}
-
-impl TryFrom<types::ResumableEvent> for sse::Event {
- type Error = serde_json::Error;
-
- fn try_from(value: types::ResumableEvent) -> Result<Self, Self::Error> {
- let types::ResumableEvent(resume_at, data) = value;
-
- let id = serde_json::to_string(&resume_at)?;
- let data = serde_json::to_string_pretty(&data)?;
-
- let event = Self::default().id(id).data(data);
-
- Ok(event)
- }
-}