summaryrefslogtreecommitdiff
path: root/src/event
diff options
context:
space:
mode:
Diffstat (limited to 'src/event')
-rw-r--r--src/event/app.rs30
-rw-r--r--src/event/repo.rs5
-rw-r--r--src/event/routes/get.rs (renamed from src/event/routes.rs)51
-rw-r--r--src/event/routes/mod.rs11
-rw-r--r--src/event/routes/test.rs88
-rw-r--r--src/event/sequence.rs11
6 files changed, 115 insertions, 81 deletions
diff --git a/src/event/app.rs b/src/event/app.rs
index 951ce25..c754388 100644
--- a/src/event/app.rs
+++ b/src/event/app.rs
@@ -11,6 +11,7 @@ use crate::{
channel::{self, repo::Provider as _},
login::{self, repo::Provider as _},
message::{self, repo::Provider as _},
+ name,
};
pub struct Events<'a> {
@@ -26,7 +27,7 @@ impl<'a> Events<'a> {
pub async fn subscribe(
&self,
resume_at: impl Into<ResumePoint>,
- ) -> Result<impl Stream<Item = Event> + std::fmt::Debug, sqlx::Error> {
+ ) -> Result<impl Stream<Item = Event> + std::fmt::Debug, Error> {
let resume_at = resume_at.into();
// Subscribe before retrieving, to catch messages broadcast while we're
// querying the DB. We'll prune out duplicates later.
@@ -81,3 +82,30 @@ impl<'a> Events<'a> {
move |event| future::ready(filter(event))
}
}
+
+#[derive(Debug, thiserror::Error)]
+#[error(transparent)]
+pub enum Error {
+ Database(#[from] sqlx::Error),
+ Name(#[from] name::Error),
+}
+
+impl From<login::repo::LoadError> for Error {
+ fn from(error: login::repo::LoadError) -> Self {
+ use login::repo::LoadError;
+ match error {
+ LoadError::Database(error) => error.into(),
+ LoadError::Name(error) => error.into(),
+ }
+ }
+}
+
+impl From<channel::repo::LoadError> for Error {
+ fn from(error: channel::repo::LoadError) -> Self {
+ use channel::repo::LoadError;
+ match error {
+ LoadError::Database(error) => error.into(),
+ LoadError::Name(error) => error.into(),
+ }
+ }
+}
diff --git a/src/event/repo.rs b/src/event/repo.rs
index 40d6a53..56beeea 100644
--- a/src/event/repo.rs
+++ b/src/event/repo.rs
@@ -29,10 +29,7 @@ impl<'c> Sequences<'c> {
.fetch_one(&mut *self.0)
.await?;
- Ok(Instant {
- at: *at,
- sequence: next,
- })
+ Ok(Instant::new(*at, next))
}
pub async fn current(&mut self) -> Result<Sequence, sqlx::Error> {
diff --git a/src/event/routes.rs b/src/event/routes/get.rs
index de6d248..22e8762 100644
--- a/src/event/routes.rs
+++ b/src/event/routes/get.rs
@@ -1,41 +1,27 @@
use axum::{
extract::State,
response::{
+ self,
sse::{self, Sse},
- IntoResponse, Response,
+ IntoResponse,
},
- routing::get,
- Router,
};
use axum_extra::extract::Query;
use futures::stream::{Stream, StreamExt as _};
-use super::{extract::LastEventId, Event};
use crate::{
app::App,
error::{Internal, Unauthorized},
- event::{ResumePoint, Sequence, Sequenced as _},
+ event::{app, extract::LastEventId, Event, ResumePoint, Sequence, Sequenced as _},
token::{app::ValidateError, extract::Identity},
};
-#[cfg(test)]
-mod test;
-
-pub fn router() -> Router<App> {
- Router::new().route("/api/events", get(events))
-}
-
-#[derive(Default, serde::Deserialize)]
-struct EventsQuery {
- resume_point: ResumePoint,
-}
-
-async fn events(
+pub async fn handler(
State(app): State<App>,
identity: Identity,
last_event_id: Option<LastEventId<Sequence>>,
- Query(query): Query<EventsQuery>,
-) -> Result<Events<impl Stream<Item = Event> + std::fmt::Debug>, EventsError> {
+ Query(query): Query<QueryParams>,
+) -> Result<Response<impl Stream<Item = Event> + std::fmt::Debug>, Error> {
let resume_at = last_event_id
.map(LastEventId::into_inner)
.or(query.resume_point);
@@ -43,17 +29,22 @@ async fn events(
let stream = app.events().subscribe(resume_at).await?;
let stream = app.tokens().limit_stream(identity.token, stream).await?;
- Ok(Events(stream))
+ Ok(Response(stream))
+}
+
+#[derive(Default, serde::Deserialize)]
+pub struct QueryParams {
+ pub resume_point: ResumePoint,
}
#[derive(Debug)]
-struct Events<S>(S);
+pub struct Response<S>(pub S);
-impl<S> IntoResponse for Events<S>
+impl<S> IntoResponse for Response<S>
where
S: Stream<Item = Event> + Send + 'static,
{
- fn into_response(self) -> Response {
+ fn into_response(self) -> response::Response {
let Self(stream) = self;
let stream = stream.map(sse::Event::try_from);
Sse::new(stream)
@@ -77,15 +68,15 @@ impl TryFrom<Event> for sse::Event {
#[derive(Debug, thiserror::Error)]
#[error(transparent)]
-pub enum EventsError {
- DatabaseError(#[from] sqlx::Error),
- ValidateError(#[from] ValidateError),
+pub enum Error {
+ Subscribe(#[from] app::Error),
+ Validate(#[from] ValidateError),
}
-impl IntoResponse for EventsError {
- fn into_response(self) -> Response {
+impl IntoResponse for Error {
+ fn into_response(self) -> response::Response {
match self {
- Self::ValidateError(ValidateError::InvalidToken) => Unauthorized.into_response(),
+ Self::Validate(ValidateError::InvalidToken) => Unauthorized.into_response(),
other => Internal::from(other).into_response(),
}
}
diff --git a/src/event/routes/mod.rs b/src/event/routes/mod.rs
new file mode 100644
index 0000000..57ab9db
--- /dev/null
+++ b/src/event/routes/mod.rs
@@ -0,0 +1,11 @@
+use axum::{routing::get, Router};
+
+use crate::app::App;
+
+mod get;
+#[cfg(test)]
+mod test;
+
+pub fn router() -> Router<App> {
+ Router::new().route("/api/events", get(get::handler))
+}
diff --git a/src/event/routes/test.rs b/src/event/routes/test.rs
index 209a016..49f8094 100644
--- a/src/event/routes/test.rs
+++ b/src/event/routes/test.rs
@@ -5,8 +5,9 @@ use futures::{
stream::{self, StreamExt as _},
};
+use super::get;
use crate::{
- event::{routes, Sequenced as _},
+ event::Sequenced as _,
test::fixtures::{self, future::Immediately as _},
};
@@ -21,16 +22,15 @@ async fn includes_historical_message() {
// Call the endpoint
- let subscriber_creds = fixtures::login::create_with_password(&app, &fixtures::now()).await;
- let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await;
- let routes::Events(events) = routes::events(State(app), subscriber, None, Query::default())
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let get::Response(events) = get::handler(State(app), subscriber, None, Query::default())
.await
.expect("subscribe never fails");
// Verify the structure of the response.
let event = events
- .filter(fixtures::filter::messages())
+ .filter_map(fixtures::message::events)
.next()
.immediately()
.await
@@ -48,10 +48,9 @@ async fn includes_live_message() {
// Call the endpoint
- let subscriber_creds = fixtures::login::create_with_password(&app, &fixtures::now()).await;
- let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await;
- let routes::Events(events) =
- routes::events(State(app.clone()), subscriber, None, Query::default())
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let get::Response(events) =
+ get::handler(State(app.clone()), subscriber, None, Query::default())
.await
.expect("subscribe never fails");
@@ -61,7 +60,7 @@ async fn includes_live_message() {
let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await;
let event = events
- .filter(fixtures::filter::messages())
+ .filter_map(fixtures::message::events)
.next()
.immediately()
.await
@@ -94,16 +93,15 @@ async fn includes_multiple_channels() {
// Call the endpoint
- let subscriber_creds = fixtures::login::create_with_password(&app, &fixtures::now()).await;
- let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await;
- let routes::Events(events) = routes::events(State(app), subscriber, None, Query::default())
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let get::Response(events) = get::handler(State(app), subscriber, None, Query::default())
.await
.expect("subscribe never fails");
// Verify the structure of the response.
let events = events
- .filter(fixtures::filter::messages())
+ .filter_map(fixtures::message::events)
.take(messages.len())
.collect::<Vec<_>>()
.immediately()
@@ -132,21 +130,22 @@ async fn sequential_messages() {
// Call the endpoint
- let subscriber_creds = fixtures::login::create_with_password(&app, &fixtures::now()).await;
- let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await;
- let routes::Events(events) = routes::events(State(app), subscriber, None, Query::default())
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let get::Response(events) = get::handler(State(app), subscriber, None, Query::default())
.await
.expect("subscribe never fails");
// Verify the structure of the response.
- let mut events = events.filter(|event| {
- future::ready(
- messages
- .iter()
- .any(|message| fixtures::event::message_sent(event, message)),
- )
- });
+ let mut events = events
+ .filter_map(fixtures::message::events)
+ .filter(|event| {
+ future::ready(
+ messages
+ .iter()
+ .any(|message| fixtures::event::message_sent(event, message)),
+ )
+ });
// Verify delivery in order
for message in &messages {
@@ -177,12 +176,11 @@ async fn resumes_from() {
// Call the endpoint
- let subscriber_creds = fixtures::login::create_with_password(&app, &fixtures::now()).await;
- let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await;
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
let resume_at = {
// First subscription
- let routes::Events(events) = routes::events(
+ let get::Response(events) = get::handler(
State(app.clone()),
subscriber.clone(),
None,
@@ -192,7 +190,7 @@ async fn resumes_from() {
.expect("subscribe never fails");
let event = events
- .filter(fixtures::filter::messages())
+ .filter_map(fixtures::message::events)
.next()
.immediately()
.await
@@ -204,7 +202,7 @@ async fn resumes_from() {
};
// Resume after disconnect
- let routes::Events(resumed) = routes::events(
+ let get::Response(resumed) = get::handler(
State(app),
subscriber,
Some(resume_at.into()),
@@ -216,6 +214,7 @@ async fn resumes_from() {
// Verify the structure of the response.
let events = resumed
+ .filter_map(fixtures::message::events)
.take(later_messages.len())
.collect::<Vec<_>>()
.immediately()
@@ -254,8 +253,7 @@ async fn serial_resume() {
// Call the endpoint
- let subscriber_creds = fixtures::login::create_with_password(&app, &fixtures::now()).await;
- let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await;
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
let resume_at = {
let initial_messages = [
@@ -264,7 +262,7 @@ async fn serial_resume() {
];
// First subscription
- let routes::Events(events) = routes::events(
+ let get::Response(events) = get::handler(
State(app.clone()),
subscriber.clone(),
None,
@@ -274,7 +272,7 @@ async fn serial_resume() {
.expect("subscribe never fails");
let events = events
- .filter(fixtures::filter::messages())
+ .filter_map(fixtures::message::events)
.take(initial_messages.len())
.collect::<Vec<_>>()
.immediately()
@@ -302,7 +300,7 @@ async fn serial_resume() {
];
// Second subscription
- let routes::Events(events) = routes::events(
+ let get::Response(events) = get::handler(
State(app.clone()),
subscriber.clone(),
Some(resume_at.into()),
@@ -312,7 +310,7 @@ async fn serial_resume() {
.expect("subscribe never fails");
let events = events
- .filter(fixtures::filter::messages())
+ .filter_map(fixtures::message::events)
.take(resume_messages.len())
.collect::<Vec<_>>()
.immediately()
@@ -340,7 +338,7 @@ async fn serial_resume() {
];
// Third subscription
- let routes::Events(events) = routes::events(
+ let get::Response(events) = get::handler(
State(app.clone()),
subscriber.clone(),
Some(resume_at.into()),
@@ -350,7 +348,7 @@ async fn serial_resume() {
.expect("subscribe never fails");
let events = events
- .filter(fixtures::filter::messages())
+ .filter_map(fixtures::message::events)
.take(final_messages.len())
.collect::<Vec<_>>()
.immediately()
@@ -378,10 +376,10 @@ async fn terminates_on_token_expiry() {
let subscriber_creds = fixtures::login::create_with_password(&app, &fixtures::now()).await;
let subscriber =
- fixtures::identity::identity(&app, &subscriber_creds, &fixtures::ancient()).await;
+ fixtures::identity::logged_in(&app, &subscriber_creds, &fixtures::ancient()).await;
- let routes::Events(events) =
- routes::events(State(app.clone()), subscriber, None, Query::default())
+ let get::Response(events) =
+ get::handler(State(app.clone()), subscriber, None, Query::default())
.await
.expect("subscribe never fails");
@@ -400,6 +398,7 @@ async fn terminates_on_token_expiry() {
];
assert!(events
+ .filter_map(fixtures::message::events)
.filter(|event| future::ready(
messages
.iter()
@@ -421,13 +420,9 @@ async fn terminates_on_logout() {
// Subscribe via the endpoint
- let subscriber_creds = fixtures::login::create_with_password(&app, &fixtures::now()).await;
- let subscriber_token =
- fixtures::identity::logged_in(&app, &subscriber_creds, &fixtures::now()).await;
- let subscriber =
- fixtures::identity::from_token(&app, &subscriber_token, &fixtures::now()).await;
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- let routes::Events(events) = routes::events(
+ let get::Response(events) = get::handler(
State(app.clone()),
subscriber.clone(),
None,
@@ -451,6 +446,7 @@ async fn terminates_on_logout() {
];
assert!(events
+ .filter_map(fixtures::message::events)
.filter(|event| future::ready(
messages
.iter()
diff --git a/src/event/sequence.rs b/src/event/sequence.rs
index bf6d5b8..9bc399b 100644
--- a/src/event/sequence.rs
+++ b/src/event/sequence.rs
@@ -10,6 +10,17 @@ pub struct Instant {
pub sequence: Sequence,
}
+impl Instant {
+ pub fn new(at: DateTime, sequence: Sequence) -> Self {
+ Self { at, sequence }
+ }
+
+ pub fn optional(at: Option<DateTime>, sequence: Option<Sequence>) -> Option<Self> {
+ at.zip(sequence)
+ .map(|(at, sequence)| Self::new(at, sequence))
+ }
+}
+
impl From<Instant> for Sequence {
fn from(instant: Instant) -> Self {
instant.sequence