summaryrefslogtreecommitdiff
path: root/src/event/routes.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/event/routes.rs')
-rw-r--r--src/event/routes.rs93
1 files changed, 93 insertions, 0 deletions
diff --git a/src/event/routes.rs b/src/event/routes.rs
new file mode 100644
index 0000000..77761ca
--- /dev/null
+++ b/src/event/routes.rs
@@ -0,0 +1,93 @@
+use axum::{
+ extract::State,
+ response::{
+ sse::{self, Sse},
+ IntoResponse, Response,
+ },
+ routing::get,
+ Router,
+};
+use axum_extra::extract::Query;
+use futures::stream::{Stream, StreamExt as _};
+
+use super::{extract::LastEventId, types};
+use crate::{
+ app::App,
+ error::{Internal, Unauthorized},
+ event::Sequence,
+ login::app::ValidateError,
+ token::extract::Identity,
+};
+
+#[cfg(test)]
+mod test;
+
+pub fn router() -> Router<App> {
+ Router::new().route("/api/events", get(events))
+}
+
+#[derive(Default, serde::Deserialize)]
+struct EventsQuery {
+ resume_point: Option<Sequence>,
+}
+
+async fn events(
+ State(app): State<App>,
+ identity: Identity,
+ last_event_id: Option<LastEventId<Sequence>>,
+ Query(query): Query<EventsQuery>,
+) -> Result<Events<impl Stream<Item = types::ChannelEvent> + std::fmt::Debug>, EventsError> {
+ let resume_at = last_event_id
+ .map(LastEventId::into_inner)
+ .or(query.resume_point);
+
+ let stream = app.events().subscribe(resume_at).await?;
+ let stream = app.logins().limit_stream(identity.token, stream).await?;
+
+ Ok(Events(stream))
+}
+
+#[derive(Debug)]
+struct Events<S>(S);
+
+impl<S> IntoResponse for Events<S>
+where
+ S: Stream<Item = types::ChannelEvent> + Send + 'static,
+{
+ fn into_response(self) -> Response {
+ let Self(stream) = self;
+ let stream = stream.map(sse::Event::try_from);
+ Sse::new(stream)
+ .keep_alive(sse::KeepAlive::default())
+ .into_response()
+ }
+}
+
+impl TryFrom<types::ChannelEvent> for sse::Event {
+ type Error = serde_json::Error;
+
+ fn try_from(event: types::ChannelEvent) -> Result<Self, Self::Error> {
+ let id = serde_json::to_string(&event.sequence)?;
+ let data = serde_json::to_string_pretty(&event)?;
+
+ let event = Self::default().id(id).data(data);
+
+ Ok(event)
+ }
+}
+
+#[derive(Debug, thiserror::Error)]
+#[error(transparent)]
+pub enum EventsError {
+ DatabaseError(#[from] sqlx::Error),
+ ValidateError(#[from] ValidateError),
+}
+
+impl IntoResponse for EventsError {
+ fn into_response(self) -> Response {
+ match self {
+ Self::ValidateError(ValidateError::InvalidToken) => Unauthorized.into_response(),
+ other => Internal::from(other).into_response(),
+ }
+ }
+}