summaryrefslogtreecommitdiff
path: root/src/event
diff options
context:
space:
mode:
Diffstat (limited to 'src/event')
-rw-r--r--src/event/app.rs72
-rw-r--r--src/event/broadcaster.rs3
-rw-r--r--src/event/extract.rs85
-rw-r--r--src/event/mod.rs75
-rw-r--r--src/event/repo.rs50
-rw-r--r--src/event/routes.rs92
-rw-r--r--src/event/routes/test.rs463
-rw-r--r--src/event/sequence.rs90
8 files changed, 930 insertions, 0 deletions
diff --git a/src/event/app.rs b/src/event/app.rs
new file mode 100644
index 0000000..d664ec7
--- /dev/null
+++ b/src/event/app.rs
@@ -0,0 +1,72 @@
+use futures::{
+ future,
+ stream::{self, StreamExt as _},
+ Stream,
+};
+use itertools::Itertools as _;
+use sqlx::sqlite::SqlitePool;
+
+use super::{broadcaster::Broadcaster, Event, Sequence, Sequenced};
+use crate::{
+ channel::{self, repo::Provider as _},
+ message::{self, repo::Provider as _},
+};
+
+pub struct Events<'a> {
+ db: &'a SqlitePool,
+ events: &'a Broadcaster,
+}
+
+impl<'a> Events<'a> {
+ pub const fn new(db: &'a SqlitePool, events: &'a Broadcaster) -> Self {
+ Self { db, events }
+ }
+
+ pub async fn subscribe(
+ &self,
+ resume_at: Option<Sequence>,
+ ) -> Result<impl Stream<Item = Event> + std::fmt::Debug, sqlx::Error> {
+ // Subscribe before retrieving, to catch messages broadcast while we're
+ // querying the DB. We'll prune out duplicates later.
+ let live_messages = self.events.subscribe();
+
+ let mut tx = self.db.begin().await?;
+
+ let channels = tx.channels().replay(resume_at).await?;
+ let channel_events = channels
+ .iter()
+ .map(channel::History::events)
+ .kmerge_by(Sequence::merge)
+ .filter(Sequence::after(resume_at))
+ .map(Event::from);
+
+ let messages = tx.messages().replay(resume_at).await?;
+ let message_events = messages
+ .iter()
+ .map(message::History::events)
+ .kmerge_by(Sequence::merge)
+ .filter(Sequence::after(resume_at))
+ .map(Event::from);
+
+ let replay_events = channel_events
+ .merge_by(message_events, Sequence::merge)
+ .collect::<Vec<_>>();
+ let resume_live_at = replay_events.last().map(Sequenced::sequence);
+
+ let replay = stream::iter(replay_events);
+
+ let live_messages = live_messages
+ // Filtering on the broadcast resume point filters out messages
+ // before resume_at, and filters out messages duplicated from
+ // `replay_events`.
+ .flat_map(stream::iter)
+ .filter(Self::resume(resume_live_at));
+
+ Ok(replay.chain(live_messages))
+ }
+
+ fn resume(resume_at: Option<Sequence>) -> impl for<'m> FnMut(&'m Event) -> future::Ready<bool> {
+ let filter = Sequence::after(resume_at);
+ move |event| future::ready(filter(event))
+ }
+}
diff --git a/src/event/broadcaster.rs b/src/event/broadcaster.rs
new file mode 100644
index 0000000..3c4efac
--- /dev/null
+++ b/src/event/broadcaster.rs
@@ -0,0 +1,3 @@
+use crate::broadcast;
+
+pub type Broadcaster = broadcast::Broadcaster<Vec<super::Event>>;
diff --git a/src/event/extract.rs b/src/event/extract.rs
new file mode 100644
index 0000000..e3021e2
--- /dev/null
+++ b/src/event/extract.rs
@@ -0,0 +1,85 @@
+use std::ops::Deref;
+
+use axum::{
+ extract::FromRequestParts,
+ http::{request::Parts, HeaderName, HeaderValue},
+};
+use axum_extra::typed_header::TypedHeader;
+use serde::{de::DeserializeOwned, Serialize};
+
+// A typed header. When used as a bare extractor, reads from the
+// `Last-Event-Id` HTTP header.
+pub struct LastEventId<T>(pub T);
+
+static LAST_EVENT_ID: HeaderName = HeaderName::from_static("last-event-id");
+
+impl<T> headers::Header for LastEventId<T>
+where
+ T: Serialize + DeserializeOwned,
+{
+ fn name() -> &'static HeaderName {
+ &LAST_EVENT_ID
+ }
+
+ fn decode<'i, I>(values: &mut I) -> Result<Self, headers::Error>
+ where
+ I: Iterator<Item = &'i HeaderValue>,
+ {
+ let value = values.next().ok_or_else(headers::Error::invalid)?;
+ let value = value.to_str().map_err(|_| headers::Error::invalid())?;
+ let value = serde_json::from_str(value).map_err(|_| headers::Error::invalid())?;
+ Ok(Self(value))
+ }
+
+ fn encode<E>(&self, values: &mut E)
+ where
+ E: Extend<HeaderValue>,
+ {
+ let Self(value) = self;
+ // Must panic or suppress; the trait provides no other options.
+ let value = serde_json::to_string(value).expect("value can be encoded as JSON");
+ let value = HeaderValue::from_str(&value).expect("LastEventId is a valid header value");
+
+ values.extend(std::iter::once(value));
+ }
+}
+
+#[async_trait::async_trait]
+impl<S, T> FromRequestParts<S> for LastEventId<T>
+where
+ S: Send + Sync,
+ T: Serialize + DeserializeOwned,
+{
+ type Rejection = <TypedHeader<Self> as FromRequestParts<S>>::Rejection;
+
+ async fn from_request_parts(parts: &mut Parts, state: &S) -> Result<Self, Self::Rejection> {
+ // This is purely for ergonomics: it allows `RequestedAt` to be extracted
+ // without having to wrap it in `Extension<>`. Callers _can_ still do that,
+ // but they aren't forced to.
+ let TypedHeader(requested_at) = TypedHeader::from_request_parts(parts, state).await?;
+
+ Ok(requested_at)
+ }
+}
+
+impl<T> Deref for LastEventId<T> {
+ type Target = T;
+
+ fn deref(&self) -> &Self::Target {
+ let Self(header) = self;
+ header
+ }
+}
+
+impl<T> From<T> for LastEventId<T> {
+ fn from(value: T) -> Self {
+ Self(value)
+ }
+}
+
+impl<T> LastEventId<T> {
+ pub fn into_inner(self) -> T {
+ let Self(value) = self;
+ value
+ }
+}
diff --git a/src/event/mod.rs b/src/event/mod.rs
new file mode 100644
index 0000000..1349fe6
--- /dev/null
+++ b/src/event/mod.rs
@@ -0,0 +1,75 @@
+use crate::{channel, message};
+
+pub mod app;
+pub mod broadcaster;
+mod extract;
+pub mod repo;
+mod routes;
+mod sequence;
+
+pub use self::{
+ routes::router,
+ sequence::{Instant, Sequence, Sequenced},
+};
+
+#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
+pub struct Event {
+ #[serde(flatten)]
+ pub instant: Instant,
+ #[serde(flatten)]
+ pub kind: Kind,
+}
+
+impl Sequenced for Event {
+ fn instant(&self) -> Instant {
+ self.instant
+ }
+}
+
+impl From<channel::Event> for Event {
+ fn from(event: channel::Event) -> Self {
+ Self {
+ instant: event.instant,
+ kind: event.kind.into(),
+ }
+ }
+}
+
+impl From<message::Event> for Event {
+ fn from(event: message::Event) -> Self {
+ Self {
+ instant: event.instant(),
+ kind: event.kind.into(),
+ }
+ }
+}
+
+#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
+#[serde(tag = "type", rename_all = "snake_case")]
+pub enum Kind {
+ #[serde(rename = "created")]
+ ChannelCreated(channel::event::Created),
+ #[serde(rename = "message")]
+ MessageSent(message::event::Sent),
+ MessageDeleted(message::event::Deleted),
+ #[serde(rename = "deleted")]
+ ChannelDeleted(channel::event::Deleted),
+}
+
+impl From<channel::event::Kind> for Kind {
+ fn from(kind: channel::event::Kind) -> Self {
+ match kind {
+ channel::event::Kind::Created(created) => Self::ChannelCreated(created),
+ channel::event::Kind::Deleted(deleted) => Self::ChannelDeleted(deleted),
+ }
+ }
+}
+
+impl From<message::event::Kind> for Kind {
+ fn from(kind: message::event::Kind) -> Self {
+ match kind {
+ message::event::Kind::Sent(created) => Self::MessageSent(created),
+ message::event::Kind::Deleted(deleted) => Self::MessageDeleted(deleted),
+ }
+ }
+}
diff --git a/src/event/repo.rs b/src/event/repo.rs
new file mode 100644
index 0000000..40d6a53
--- /dev/null
+++ b/src/event/repo.rs
@@ -0,0 +1,50 @@
+use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction};
+
+use crate::{
+ clock::DateTime,
+ event::{Instant, Sequence},
+};
+
+pub trait Provider {
+ fn sequence(&mut self) -> Sequences;
+}
+
+impl<'c> Provider for Transaction<'c, Sqlite> {
+ fn sequence(&mut self) -> Sequences {
+ Sequences(self)
+ }
+}
+
+pub struct Sequences<'t>(&'t mut SqliteConnection);
+
+impl<'c> Sequences<'c> {
+ pub async fn next(&mut self, at: &DateTime) -> Result<Instant, sqlx::Error> {
+ let next = sqlx::query_scalar!(
+ r#"
+ update event_sequence
+ set last_value = last_value + 1
+ returning last_value as "next_value: Sequence"
+ "#,
+ )
+ .fetch_one(&mut *self.0)
+ .await?;
+
+ Ok(Instant {
+ at: *at,
+ sequence: next,
+ })
+ }
+
+ pub async fn current(&mut self) -> Result<Sequence, sqlx::Error> {
+ let next = sqlx::query_scalar!(
+ r#"
+ select last_value as "last_value: Sequence"
+ from event_sequence
+ "#,
+ )
+ .fetch_one(&mut *self.0)
+ .await?;
+
+ Ok(next)
+ }
+}
diff --git a/src/event/routes.rs b/src/event/routes.rs
new file mode 100644
index 0000000..5b9c7e3
--- /dev/null
+++ b/src/event/routes.rs
@@ -0,0 +1,92 @@
+use axum::{
+ extract::State,
+ response::{
+ sse::{self, Sse},
+ IntoResponse, Response,
+ },
+ routing::get,
+ Router,
+};
+use axum_extra::extract::Query;
+use futures::stream::{Stream, StreamExt as _};
+
+use super::{extract::LastEventId, Event};
+use crate::{
+ app::App,
+ error::{Internal, Unauthorized},
+ event::{Sequence, Sequenced as _},
+ token::{app::ValidateError, extract::Identity},
+};
+
+#[cfg(test)]
+mod test;
+
+pub fn router() -> Router<App> {
+ Router::new().route("/api/events", get(events))
+}
+
+#[derive(Default, serde::Deserialize)]
+struct EventsQuery {
+ resume_point: Option<Sequence>,
+}
+
+async fn events(
+ State(app): State<App>,
+ identity: Identity,
+ last_event_id: Option<LastEventId<Sequence>>,
+ Query(query): Query<EventsQuery>,
+) -> Result<Events<impl Stream<Item = Event> + std::fmt::Debug>, EventsError> {
+ let resume_at = last_event_id
+ .map(LastEventId::into_inner)
+ .or(query.resume_point);
+
+ let stream = app.events().subscribe(resume_at).await?;
+ let stream = app.tokens().limit_stream(identity.token, stream).await?;
+
+ Ok(Events(stream))
+}
+
+#[derive(Debug)]
+struct Events<S>(S);
+
+impl<S> IntoResponse for Events<S>
+where
+ S: Stream<Item = Event> + Send + 'static,
+{
+ fn into_response(self) -> Response {
+ let Self(stream) = self;
+ let stream = stream.map(sse::Event::try_from);
+ Sse::new(stream)
+ .keep_alive(sse::KeepAlive::default())
+ .into_response()
+ }
+}
+
+impl TryFrom<Event> for sse::Event {
+ type Error = serde_json::Error;
+
+ fn try_from(event: Event) -> Result<Self, Self::Error> {
+ let id = serde_json::to_string(&event.sequence())?;
+ let data = serde_json::to_string_pretty(&event)?;
+
+ let event = Self::default().id(id).data(data);
+
+ Ok(event)
+ }
+}
+
+#[derive(Debug, thiserror::Error)]
+#[error(transparent)]
+pub enum EventsError {
+ DatabaseError(#[from] sqlx::Error),
+ ValidateError(#[from] ValidateError),
+}
+
+impl IntoResponse for EventsError {
+ fn into_response(self) -> Response {
+ match self {
+ Self::ValidateError(ValidateError::InvalidToken) => Unauthorized.into_response(),
+ other => Internal::from(other).into_response(),
+ }
+ }
+}
diff --git a/src/event/routes/test.rs b/src/event/routes/test.rs
new file mode 100644
index 0000000..ba9953e
--- /dev/null
+++ b/src/event/routes/test.rs
@@ -0,0 +1,463 @@
+use axum::extract::State;
+use axum_extra::extract::Query;
+use futures::{
+ future,
+ stream::{self, StreamExt as _},
+};
+
+use crate::{
+ event::{routes, Sequenced as _},
+ test::fixtures::{self, future::Immediately as _},
+};
+
+#[tokio::test]
+async fn includes_historical_message() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let sender = fixtures::login::create(&app).await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+ let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await;
+
+ // Call the endpoint
+
+ let subscriber_creds = fixtures::login::create_with_password(&app).await;
+ let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await;
+ let routes::Events(events) = routes::events(State(app), subscriber, None, Query::default())
+ .await
+ .expect("subscribe never fails");
+
+ // Verify the structure of the response.
+
+ let event = events
+ .filter(fixtures::filter::messages())
+ .next()
+ .immediately()
+ .await
+ .expect("delivered stored message");
+
+ assert!(fixtures::event::message_sent(&event, &message));
+}
+
+#[tokio::test]
+async fn includes_live_message() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+
+ // Call the endpoint
+
+ let subscriber_creds = fixtures::login::create_with_password(&app).await;
+ let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await;
+ let routes::Events(events) =
+ routes::events(State(app.clone()), subscriber, None, Query::default())
+ .await
+ .expect("subscribe never fails");
+
+ // Verify the semantics
+
+ let sender = fixtures::login::create(&app).await;
+ let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await;
+
+ let event = events
+ .filter(fixtures::filter::messages())
+ .next()
+ .immediately()
+ .await
+ .expect("delivered live message");
+
+ assert!(fixtures::event::message_sent(&event, &message));
+}
+
+#[tokio::test]
+async fn includes_multiple_channels() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let sender = fixtures::login::create(&app).await;
+
+ let channels = [
+ fixtures::channel::create(&app, &fixtures::now()).await,
+ fixtures::channel::create(&app, &fixtures::now()).await,
+ ];
+
+ let messages = stream::iter(channels)
+ .then(|channel| {
+ let app = app.clone();
+ let sender = sender.clone();
+ let channel = channel.clone();
+ async move { fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await }
+ })
+ .collect::<Vec<_>>()
+ .await;
+
+ // Call the endpoint
+
+ let subscriber_creds = fixtures::login::create_with_password(&app).await;
+ let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await;
+ let routes::Events(events) = routes::events(State(app), subscriber, None, Query::default())
+ .await
+ .expect("subscribe never fails");
+
+ // Verify the structure of the response.
+
+ let events = events
+ .filter(fixtures::filter::messages())
+ .take(messages.len())
+ .collect::<Vec<_>>()
+ .immediately()
+ .await;
+
+ for message in &messages {
+ assert!(events
+ .iter()
+ .any(|event| fixtures::event::message_sent(event, message)));
+ }
+}
+
+#[tokio::test]
+async fn sequential_messages() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+ let sender = fixtures::login::create(&app).await;
+
+ let messages = vec![
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ ];
+
+ // Call the endpoint
+
+ let subscriber_creds = fixtures::login::create_with_password(&app).await;
+ let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await;
+ let routes::Events(events) = routes::events(State(app), subscriber, None, Query::default())
+ .await
+ .expect("subscribe never fails");
+
+ // Verify the structure of the response.
+
+ let mut events = events.filter(|event| {
+ future::ready(
+ messages
+ .iter()
+ .any(|message| fixtures::event::message_sent(event, message)),
+ )
+ });
+
+ // Verify delivery in order
+ for message in &messages {
+ let event = events
+ .next()
+ .immediately()
+ .await
+ .expect("undelivered messages remaining");
+
+ assert!(fixtures::event::message_sent(&event, message));
+ }
+}
+
+#[tokio::test]
+async fn resumes_from() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+ let sender = fixtures::login::create(&app).await;
+
+ let initial_message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await;
+
+ let later_messages = vec![
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ ];
+
+ // Call the endpoint
+
+ let subscriber_creds = fixtures::login::create_with_password(&app).await;
+ let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await;
+
+ let resume_at = {
+ // First subscription
+ let routes::Events(events) = routes::events(
+ State(app.clone()),
+ subscriber.clone(),
+ None,
+ Query::default(),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ let event = events
+ .filter(fixtures::filter::messages())
+ .next()
+ .immediately()
+ .await
+ .expect("delivered events");
+
+ assert!(fixtures::event::message_sent(&event, &initial_message));
+
+ event.sequence()
+ };
+
+ // Resume after disconnect
+ let routes::Events(resumed) = routes::events(
+ State(app),
+ subscriber,
+ Some(resume_at.into()),
+ Query::default(),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ // Verify the structure of the response.
+
+ let events = resumed
+ .take(later_messages.len())
+ .collect::<Vec<_>>()
+ .immediately()
+ .await;
+
+ for message in &later_messages {
+ assert!(events
+ .iter()
+ .any(|event| fixtures::event::message_sent(event, message)));
+ }
+}
+
+// This test verifies a real bug I hit developing the vector-of-sequences
+// approach to resuming events. A small omission caused the event IDs in a
+// resumed stream to _omit_ channels that were in the original stream until
+// those channels also appeared in the resumed stream.
+//
+// Clients would see something like
+// * In the original stream, Cfoo=5,Cbar=8
+// * In the resumed stream, Cfoo=6 (no Cbar sequence number)
+//
+// Disconnecting and reconnecting a second time, using event IDs from that
+// initial period of the first resume attempt, would then cause the second
+// resume attempt to restart all other channels from the beginning, and not
+// from where the first disconnection happened.
+//
+// This is a real and valid behaviour for clients!
+#[tokio::test]
+async fn serial_resume() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let sender = fixtures::login::create(&app).await;
+ let channel_a = fixtures::channel::create(&app, &fixtures::now()).await;
+ let channel_b = fixtures::channel::create(&app, &fixtures::now()).await;
+
+ // Call the endpoint
+
+ let subscriber_creds = fixtures::login::create_with_password(&app).await;
+ let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await;
+
+ let resume_at = {
+ let initial_messages = [
+ fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel_b, &sender, &fixtures::now()).await,
+ ];
+
+ // First subscription
+ let routes::Events(events) = routes::events(
+ State(app.clone()),
+ subscriber.clone(),
+ None,
+ Query::default(),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ let events = events
+ .filter(fixtures::filter::messages())
+ .take(initial_messages.len())
+ .collect::<Vec<_>>()
+ .immediately()
+ .await;
+
+ for message in &initial_messages {
+ assert!(events
+ .iter()
+ .any(|event| fixtures::event::message_sent(event, message)));
+ }
+
+ let event = events.last().expect("this vec is non-empty");
+
+ event.sequence()
+ };
+
+ // Resume after disconnect
+ let resume_at = {
+ let resume_messages = [
+ // Note that channel_b does not appear here. The buggy behaviour
+ // would be masked if channel_b happened to send a new message
+ // into the resumed event stream.
+ fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await,
+ ];
+
+ // Second subscription
+ let routes::Events(events) = routes::events(
+ State(app.clone()),
+ subscriber.clone(),
+ Some(resume_at.into()),
+ Query::default(),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ let events = events
+ .filter(fixtures::filter::messages())
+ .take(resume_messages.len())
+ .collect::<Vec<_>>()
+ .immediately()
+ .await;
+
+ for message in &resume_messages {
+ assert!(events
+ .iter()
+ .any(|event| fixtures::event::message_sent(event, message)));
+ }
+
+ let event = events.last().expect("this vec is non-empty");
+
+ event.sequence()
+ };
+
+ // Resume after disconnect a second time
+ {
+ // At this point, we can send on either channel and demonstrate the
+ // problem. The resume point should before both of these messages, but
+ // after _all_ prior messages.
+ let final_messages = [
+ fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel_b, &sender, &fixtures::now()).await,
+ ];
+
+ // Third subscription
+ let routes::Events(events) = routes::events(
+ State(app.clone()),
+ subscriber.clone(),
+ Some(resume_at.into()),
+ Query::default(),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ let events = events
+ .filter(fixtures::filter::messages())
+ .take(final_messages.len())
+ .collect::<Vec<_>>()
+ .immediately()
+ .await;
+
+ // This set of messages, in particular, _should not_ include any prior
+ // messages from `initial_messages` or `resume_messages`.
+ for message in &final_messages {
+ assert!(events
+ .iter()
+ .any(|event| fixtures::event::message_sent(event, message)));
+ }
+ };
+}
+
+#[tokio::test]
+async fn terminates_on_token_expiry() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+ let sender = fixtures::login::create(&app).await;
+
+ // Subscribe via the endpoint
+
+ let subscriber_creds = fixtures::login::create_with_password(&app).await;
+ let subscriber =
+ fixtures::identity::identity(&app, &subscriber_creds, &fixtures::ancient()).await;
+
+ let routes::Events(events) =
+ routes::events(State(app.clone()), subscriber, None, Query::default())
+ .await
+ .expect("subscribe never fails");
+
+ // Verify the resulting stream's behaviour
+
+ app.tokens()
+ .expire(&fixtures::now())
+ .await
+ .expect("expiring tokens succeeds");
+
+ // These should not be delivered.
+ let messages = [
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ ];
+
+ assert!(events
+ .filter(|event| future::ready(
+ messages
+ .iter()
+ .any(|message| fixtures::event::message_sent(event, message))
+ ))
+ .next()
+ .immediately()
+ .await
+ .is_none());
+}
+
+#[tokio::test]
+async fn terminates_on_logout() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+ let sender = fixtures::login::create(&app).await;
+
+ // Subscribe via the endpoint
+
+ let subscriber_creds = fixtures::login::create_with_password(&app).await;
+ let subscriber_token =
+ fixtures::identity::logged_in(&app, &subscriber_creds, &fixtures::now()).await;
+ let subscriber =
+ fixtures::identity::from_token(&app, &subscriber_token, &fixtures::now()).await;
+
+ let routes::Events(events) = routes::events(
+ State(app.clone()),
+ subscriber.clone(),
+ None,
+ Query::default(),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ // Verify the resulting stream's behaviour
+
+ app.tokens()
+ .logout(&subscriber.token)
+ .await
+ .expect("expiring tokens succeeds");
+
+ // These should not be delivered.
+ let messages = [
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ ];
+
+ assert!(events
+ .filter(|event| future::ready(
+ messages
+ .iter()
+ .any(|message| fixtures::event::message_sent(event, message))
+ ))
+ .next()
+ .immediately()
+ .await
+ .is_none());
+}
diff --git a/src/event/sequence.rs b/src/event/sequence.rs
new file mode 100644
index 0000000..fbe3711
--- /dev/null
+++ b/src/event/sequence.rs
@@ -0,0 +1,90 @@
+use std::fmt;
+
+use crate::clock::DateTime;
+
+#[derive(Clone, Copy, Debug, Eq, PartialEq, serde::Serialize)]
+pub struct Instant {
+ pub at: DateTime,
+ #[serde(skip)]
+ pub sequence: Sequence,
+}
+
+impl From<Instant> for Sequence {
+ fn from(instant: Instant) -> Self {
+ instant.sequence
+ }
+}
+
+#[derive(
+ Clone,
+ Copy,
+ Debug,
+ Eq,
+ Ord,
+ PartialEq,
+ PartialOrd,
+ serde::Deserialize,
+ serde::Serialize,
+ sqlx::Type,
+)]
+#[serde(transparent)]
+#[sqlx(transparent)]
+pub struct Sequence(i64);
+
+impl fmt::Display for Sequence {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ let Self(value) = self;
+ value.fmt(f)
+ }
+}
+
+impl Sequence {
+ pub fn up_to<E>(resume_point: Option<Self>) -> impl for<'e> Fn(&'e E) -> bool
+ where
+ E: Sequenced,
+ {
+ move |event| resume_point.map_or(true, |resume_point| event.sequence() <= resume_point)
+ }
+
+ pub fn after<E>(resume_point: Option<Self>) -> impl for<'e> Fn(&'e E) -> bool
+ where
+ E: Sequenced,
+ {
+ move |event| resume_point < Some(event.sequence())
+ }
+
+ pub fn start_from<E>(resume_point: Self) -> impl for<'e> Fn(&'e E) -> bool
+ where
+ E: Sequenced,
+ {
+ move |event| resume_point <= event.sequence()
+ }
+
+ pub fn merge<E>(a: &E, b: &E) -> bool
+ where
+ E: Sequenced,
+ {
+ a.sequence() < b.sequence()
+ }
+}
+
+pub trait Sequenced {
+ fn instant(&self) -> Instant;
+
+ fn sequence(&self) -> Sequence {
+ self.instant().into()
+ }
+}
+
+impl<E> Sequenced for &E
+where
+ E: Sequenced,
+{
+ fn instant(&self) -> Instant {
+ (*self).instant()
+ }
+
+ fn sequence(&self) -> Sequence {
+ (*self).sequence()
+ }
+}