summaryrefslogtreecommitdiff
path: root/src/event/routes
diff options
context:
space:
mode:
Diffstat (limited to 'src/event/routes')
-rw-r--r--src/event/routes/get.rs83
-rw-r--r--src/event/routes/mod.rs11
-rw-r--r--src/event/routes/test.rs29
3 files changed, 109 insertions, 14 deletions
diff --git a/src/event/routes/get.rs b/src/event/routes/get.rs
new file mode 100644
index 0000000..357845a
--- /dev/null
+++ b/src/event/routes/get.rs
@@ -0,0 +1,83 @@
+use axum::{
+ extract::State,
+ response::{
+ self,
+ sse::{self, Sse},
+ IntoResponse,
+ },
+};
+use axum_extra::extract::Query;
+use futures::stream::{Stream, StreamExt as _};
+
+use crate::{
+ app::App,
+ error::{Internal, Unauthorized},
+ event::{extract::LastEventId, Event, ResumePoint, Sequence, Sequenced as _},
+ token::{app::ValidateError, extract::Identity},
+};
+
+pub async fn handler(
+ State(app): State<App>,
+ identity: Identity,
+ last_event_id: Option<LastEventId<Sequence>>,
+ Query(query): Query<QueryParams>,
+) -> Result<Response<impl Stream<Item = Event> + std::fmt::Debug>, Error> {
+ let resume_at = last_event_id
+ .map(LastEventId::into_inner)
+ .or(query.resume_point);
+
+ let stream = app.events().subscribe(resume_at).await?;
+ let stream = app.tokens().limit_stream(identity.token, stream).await?;
+
+ Ok(Response(stream))
+}
+
+#[derive(Default, serde::Deserialize)]
+pub struct QueryParams {
+ pub resume_point: ResumePoint,
+}
+
+#[derive(Debug)]
+pub struct Response<S>(pub S);
+
+impl<S> IntoResponse for Response<S>
+where
+ S: Stream<Item = Event> + Send + 'static,
+{
+ fn into_response(self) -> response::Response {
+ let Self(stream) = self;
+ let stream = stream.map(sse::Event::try_from);
+ Sse::new(stream)
+ .keep_alive(sse::KeepAlive::default())
+ .into_response()
+ }
+}
+
+impl TryFrom<Event> for sse::Event {
+ type Error = serde_json::Error;
+
+ fn try_from(event: Event) -> Result<Self, Self::Error> {
+ let id = serde_json::to_string(&event.sequence())?;
+ let data = serde_json::to_string_pretty(&event)?;
+
+ let event = Self::default().id(id).data(data);
+
+ Ok(event)
+ }
+}
+
+#[derive(Debug, thiserror::Error)]
+#[error(transparent)]
+pub enum Error {
+ Database(#[from] sqlx::Error),
+ Validate(#[from] ValidateError),
+}
+
+impl IntoResponse for Error {
+ fn into_response(self) -> response::Response {
+ match self {
+ Self::Validate(ValidateError::InvalidToken) => Unauthorized.into_response(),
+ other => Internal::from(other).into_response(),
+ }
+ }
+}
diff --git a/src/event/routes/mod.rs b/src/event/routes/mod.rs
new file mode 100644
index 0000000..57ab9db
--- /dev/null
+++ b/src/event/routes/mod.rs
@@ -0,0 +1,11 @@
+use axum::{routing::get, Router};
+
+use crate::app::App;
+
+mod get;
+#[cfg(test)]
+mod test;
+
+pub fn router() -> Router<App> {
+ Router::new().route("/api/events", get(get::handler))
+}
diff --git a/src/event/routes/test.rs b/src/event/routes/test.rs
index 209a016..249f5c2 100644
--- a/src/event/routes/test.rs
+++ b/src/event/routes/test.rs
@@ -5,8 +5,9 @@ use futures::{
stream::{self, StreamExt as _},
};
+use super::get;
use crate::{
- event::{routes, Sequenced as _},
+ event::Sequenced as _,
test::fixtures::{self, future::Immediately as _},
};
@@ -23,7 +24,7 @@ async fn includes_historical_message() {
let subscriber_creds = fixtures::login::create_with_password(&app, &fixtures::now()).await;
let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await;
- let routes::Events(events) = routes::events(State(app), subscriber, None, Query::default())
+ let get::Response(events) = get::handler(State(app), subscriber, None, Query::default())
.await
.expect("subscribe never fails");
@@ -50,8 +51,8 @@ async fn includes_live_message() {
let subscriber_creds = fixtures::login::create_with_password(&app, &fixtures::now()).await;
let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await;
- let routes::Events(events) =
- routes::events(State(app.clone()), subscriber, None, Query::default())
+ let get::Response(events) =
+ get::handler(State(app.clone()), subscriber, None, Query::default())
.await
.expect("subscribe never fails");
@@ -96,7 +97,7 @@ async fn includes_multiple_channels() {
let subscriber_creds = fixtures::login::create_with_password(&app, &fixtures::now()).await;
let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await;
- let routes::Events(events) = routes::events(State(app), subscriber, None, Query::default())
+ let get::Response(events) = get::handler(State(app), subscriber, None, Query::default())
.await
.expect("subscribe never fails");
@@ -134,7 +135,7 @@ async fn sequential_messages() {
let subscriber_creds = fixtures::login::create_with_password(&app, &fixtures::now()).await;
let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await;
- let routes::Events(events) = routes::events(State(app), subscriber, None, Query::default())
+ let get::Response(events) = get::handler(State(app), subscriber, None, Query::default())
.await
.expect("subscribe never fails");
@@ -182,7 +183,7 @@ async fn resumes_from() {
let resume_at = {
// First subscription
- let routes::Events(events) = routes::events(
+ let get::Response(events) = get::handler(
State(app.clone()),
subscriber.clone(),
None,
@@ -204,7 +205,7 @@ async fn resumes_from() {
};
// Resume after disconnect
- let routes::Events(resumed) = routes::events(
+ let get::Response(resumed) = get::handler(
State(app),
subscriber,
Some(resume_at.into()),
@@ -264,7 +265,7 @@ async fn serial_resume() {
];
// First subscription
- let routes::Events(events) = routes::events(
+ let get::Response(events) = get::handler(
State(app.clone()),
subscriber.clone(),
None,
@@ -302,7 +303,7 @@ async fn serial_resume() {
];
// Second subscription
- let routes::Events(events) = routes::events(
+ let get::Response(events) = get::handler(
State(app.clone()),
subscriber.clone(),
Some(resume_at.into()),
@@ -340,7 +341,7 @@ async fn serial_resume() {
];
// Third subscription
- let routes::Events(events) = routes::events(
+ let get::Response(events) = get::handler(
State(app.clone()),
subscriber.clone(),
Some(resume_at.into()),
@@ -380,8 +381,8 @@ async fn terminates_on_token_expiry() {
let subscriber =
fixtures::identity::identity(&app, &subscriber_creds, &fixtures::ancient()).await;
- let routes::Events(events) =
- routes::events(State(app.clone()), subscriber, None, Query::default())
+ let get::Response(events) =
+ get::handler(State(app.clone()), subscriber, None, Query::default())
.await
.expect("subscribe never fails");
@@ -427,7 +428,7 @@ async fn terminates_on_logout() {
let subscriber =
fixtures::identity::from_token(&app, &subscriber_token, &fixtures::now()).await;
- let routes::Events(events) = routes::events(
+ let get::Response(events) = get::handler(
State(app.clone()),
subscriber.clone(),
None,