diff options
Diffstat (limited to 'src/event')
| -rw-r--r-- | src/event/app.rs | 72 | ||||
| -rw-r--r-- | src/event/broadcaster.rs | 3 | ||||
| -rw-r--r-- | src/event/extract.rs | 85 | ||||
| -rw-r--r-- | src/event/mod.rs | 75 | ||||
| -rw-r--r-- | src/event/repo.rs | 50 | ||||
| -rw-r--r-- | src/event/routes.rs | 92 | ||||
| -rw-r--r-- | src/event/routes/test.rs | 463 | ||||
| -rw-r--r-- | src/event/sequence.rs | 90 |
8 files changed, 930 insertions, 0 deletions
diff --git a/src/event/app.rs b/src/event/app.rs new file mode 100644 index 0000000..d664ec7 --- /dev/null +++ b/src/event/app.rs @@ -0,0 +1,72 @@ +use futures::{ + future, + stream::{self, StreamExt as _}, + Stream, +}; +use itertools::Itertools as _; +use sqlx::sqlite::SqlitePool; + +use super::{broadcaster::Broadcaster, Event, Sequence, Sequenced}; +use crate::{ + channel::{self, repo::Provider as _}, + message::{self, repo::Provider as _}, +}; + +pub struct Events<'a> { + db: &'a SqlitePool, + events: &'a Broadcaster, +} + +impl<'a> Events<'a> { + pub const fn new(db: &'a SqlitePool, events: &'a Broadcaster) -> Self { + Self { db, events } + } + + pub async fn subscribe( + &self, + resume_at: Option<Sequence>, + ) -> Result<impl Stream<Item = Event> + std::fmt::Debug, sqlx::Error> { + // Subscribe before retrieving, to catch messages broadcast while we're + // querying the DB. We'll prune out duplicates later. + let live_messages = self.events.subscribe(); + + let mut tx = self.db.begin().await?; + + let channels = tx.channels().replay(resume_at).await?; + let channel_events = channels + .iter() + .map(channel::History::events) + .kmerge_by(Sequence::merge) + .filter(Sequence::after(resume_at)) + .map(Event::from); + + let messages = tx.messages().replay(resume_at).await?; + let message_events = messages + .iter() + .map(message::History::events) + .kmerge_by(Sequence::merge) + .filter(Sequence::after(resume_at)) + .map(Event::from); + + let replay_events = channel_events + .merge_by(message_events, Sequence::merge) + .collect::<Vec<_>>(); + let resume_live_at = replay_events.last().map(Sequenced::sequence); + + let replay = stream::iter(replay_events); + + let live_messages = live_messages + // Filtering on the broadcast resume point filters out messages + // before resume_at, and filters out messages duplicated from + // `replay_events`. + .flat_map(stream::iter) + .filter(Self::resume(resume_live_at)); + + Ok(replay.chain(live_messages)) + } + + fn resume(resume_at: Option<Sequence>) -> impl for<'m> FnMut(&'m Event) -> future::Ready<bool> { + let filter = Sequence::after(resume_at); + move |event| future::ready(filter(event)) + } +} diff --git a/src/event/broadcaster.rs b/src/event/broadcaster.rs new file mode 100644 index 0000000..3c4efac --- /dev/null +++ b/src/event/broadcaster.rs @@ -0,0 +1,3 @@ +use crate::broadcast; + +pub type Broadcaster = broadcast::Broadcaster<Vec<super::Event>>; diff --git a/src/event/extract.rs b/src/event/extract.rs new file mode 100644 index 0000000..e3021e2 --- /dev/null +++ b/src/event/extract.rs @@ -0,0 +1,85 @@ +use std::ops::Deref; + +use axum::{ + extract::FromRequestParts, + http::{request::Parts, HeaderName, HeaderValue}, +}; +use axum_extra::typed_header::TypedHeader; +use serde::{de::DeserializeOwned, Serialize}; + +// A typed header. When used as a bare extractor, reads from the +// `Last-Event-Id` HTTP header. +pub struct LastEventId<T>(pub T); + +static LAST_EVENT_ID: HeaderName = HeaderName::from_static("last-event-id"); + +impl<T> headers::Header for LastEventId<T> +where + T: Serialize + DeserializeOwned, +{ + fn name() -> &'static HeaderName { + &LAST_EVENT_ID + } + + fn decode<'i, I>(values: &mut I) -> Result<Self, headers::Error> + where + I: Iterator<Item = &'i HeaderValue>, + { + let value = values.next().ok_or_else(headers::Error::invalid)?; + let value = value.to_str().map_err(|_| headers::Error::invalid())?; + let value = serde_json::from_str(value).map_err(|_| headers::Error::invalid())?; + Ok(Self(value)) + } + + fn encode<E>(&self, values: &mut E) + where + E: Extend<HeaderValue>, + { + let Self(value) = self; + // Must panic or suppress; the trait provides no other options. + let value = serde_json::to_string(value).expect("value can be encoded as JSON"); + let value = HeaderValue::from_str(&value).expect("LastEventId is a valid header value"); + + values.extend(std::iter::once(value)); + } +} + +#[async_trait::async_trait] +impl<S, T> FromRequestParts<S> for LastEventId<T> +where + S: Send + Sync, + T: Serialize + DeserializeOwned, +{ + type Rejection = <TypedHeader<Self> as FromRequestParts<S>>::Rejection; + + async fn from_request_parts(parts: &mut Parts, state: &S) -> Result<Self, Self::Rejection> { + // This is purely for ergonomics: it allows `RequestedAt` to be extracted + // without having to wrap it in `Extension<>`. Callers _can_ still do that, + // but they aren't forced to. + let TypedHeader(requested_at) = TypedHeader::from_request_parts(parts, state).await?; + + Ok(requested_at) + } +} + +impl<T> Deref for LastEventId<T> { + type Target = T; + + fn deref(&self) -> &Self::Target { + let Self(header) = self; + header + } +} + +impl<T> From<T> for LastEventId<T> { + fn from(value: T) -> Self { + Self(value) + } +} + +impl<T> LastEventId<T> { + pub fn into_inner(self) -> T { + let Self(value) = self; + value + } +} diff --git a/src/event/mod.rs b/src/event/mod.rs new file mode 100644 index 0000000..1349fe6 --- /dev/null +++ b/src/event/mod.rs @@ -0,0 +1,75 @@ +use crate::{channel, message}; + +pub mod app; +pub mod broadcaster; +mod extract; +pub mod repo; +mod routes; +mod sequence; + +pub use self::{ + routes::router, + sequence::{Instant, Sequence, Sequenced}, +}; + +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] +pub struct Event { + #[serde(flatten)] + pub instant: Instant, + #[serde(flatten)] + pub kind: Kind, +} + +impl Sequenced for Event { + fn instant(&self) -> Instant { + self.instant + } +} + +impl From<channel::Event> for Event { + fn from(event: channel::Event) -> Self { + Self { + instant: event.instant, + kind: event.kind.into(), + } + } +} + +impl From<message::Event> for Event { + fn from(event: message::Event) -> Self { + Self { + instant: event.instant(), + kind: event.kind.into(), + } + } +} + +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum Kind { + #[serde(rename = "created")] + ChannelCreated(channel::event::Created), + #[serde(rename = "message")] + MessageSent(message::event::Sent), + MessageDeleted(message::event::Deleted), + #[serde(rename = "deleted")] + ChannelDeleted(channel::event::Deleted), +} + +impl From<channel::event::Kind> for Kind { + fn from(kind: channel::event::Kind) -> Self { + match kind { + channel::event::Kind::Created(created) => Self::ChannelCreated(created), + channel::event::Kind::Deleted(deleted) => Self::ChannelDeleted(deleted), + } + } +} + +impl From<message::event::Kind> for Kind { + fn from(kind: message::event::Kind) -> Self { + match kind { + message::event::Kind::Sent(created) => Self::MessageSent(created), + message::event::Kind::Deleted(deleted) => Self::MessageDeleted(deleted), + } + } +} diff --git a/src/event/repo.rs b/src/event/repo.rs new file mode 100644 index 0000000..40d6a53 --- /dev/null +++ b/src/event/repo.rs @@ -0,0 +1,50 @@ +use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; + +use crate::{ + clock::DateTime, + event::{Instant, Sequence}, +}; + +pub trait Provider { + fn sequence(&mut self) -> Sequences; +} + +impl<'c> Provider for Transaction<'c, Sqlite> { + fn sequence(&mut self) -> Sequences { + Sequences(self) + } +} + +pub struct Sequences<'t>(&'t mut SqliteConnection); + +impl<'c> Sequences<'c> { + pub async fn next(&mut self, at: &DateTime) -> Result<Instant, sqlx::Error> { + let next = sqlx::query_scalar!( + r#" + update event_sequence + set last_value = last_value + 1 + returning last_value as "next_value: Sequence" + "#, + ) + .fetch_one(&mut *self.0) + .await?; + + Ok(Instant { + at: *at, + sequence: next, + }) + } + + pub async fn current(&mut self) -> Result<Sequence, sqlx::Error> { + let next = sqlx::query_scalar!( + r#" + select last_value as "last_value: Sequence" + from event_sequence + "#, + ) + .fetch_one(&mut *self.0) + .await?; + + Ok(next) + } +} diff --git a/src/event/routes.rs b/src/event/routes.rs new file mode 100644 index 0000000..5b9c7e3 --- /dev/null +++ b/src/event/routes.rs @@ -0,0 +1,92 @@ +use axum::{ + extract::State, + response::{ + sse::{self, Sse}, + IntoResponse, Response, + }, + routing::get, + Router, +}; +use axum_extra::extract::Query; +use futures::stream::{Stream, StreamExt as _}; + +use super::{extract::LastEventId, Event}; +use crate::{ + app::App, + error::{Internal, Unauthorized}, + event::{Sequence, Sequenced as _}, + token::{app::ValidateError, extract::Identity}, +}; + +#[cfg(test)] +mod test; + +pub fn router() -> Router<App> { + Router::new().route("/api/events", get(events)) +} + +#[derive(Default, serde::Deserialize)] +struct EventsQuery { + resume_point: Option<Sequence>, +} + +async fn events( + State(app): State<App>, + identity: Identity, + last_event_id: Option<LastEventId<Sequence>>, + Query(query): Query<EventsQuery>, +) -> Result<Events<impl Stream<Item = Event> + std::fmt::Debug>, EventsError> { + let resume_at = last_event_id + .map(LastEventId::into_inner) + .or(query.resume_point); + + let stream = app.events().subscribe(resume_at).await?; + let stream = app.tokens().limit_stream(identity.token, stream).await?; + + Ok(Events(stream)) +} + +#[derive(Debug)] +struct Events<S>(S); + +impl<S> IntoResponse for Events<S> +where + S: Stream<Item = Event> + Send + 'static, +{ + fn into_response(self) -> Response { + let Self(stream) = self; + let stream = stream.map(sse::Event::try_from); + Sse::new(stream) + .keep_alive(sse::KeepAlive::default()) + .into_response() + } +} + +impl TryFrom<Event> for sse::Event { + type Error = serde_json::Error; + + fn try_from(event: Event) -> Result<Self, Self::Error> { + let id = serde_json::to_string(&event.sequence())?; + let data = serde_json::to_string_pretty(&event)?; + + let event = Self::default().id(id).data(data); + + Ok(event) + } +} + +#[derive(Debug, thiserror::Error)] +#[error(transparent)] +pub enum EventsError { + DatabaseError(#[from] sqlx::Error), + ValidateError(#[from] ValidateError), +} + +impl IntoResponse for EventsError { + fn into_response(self) -> Response { + match self { + Self::ValidateError(ValidateError::InvalidToken) => Unauthorized.into_response(), + other => Internal::from(other).into_response(), + } + } +} diff --git a/src/event/routes/test.rs b/src/event/routes/test.rs new file mode 100644 index 0000000..ba9953e --- /dev/null +++ b/src/event/routes/test.rs @@ -0,0 +1,463 @@ +use axum::extract::State; +use axum_extra::extract::Query; +use futures::{ + future, + stream::{self, StreamExt as _}, +}; + +use crate::{ + event::{routes, Sequenced as _}, + test::fixtures::{self, future::Immediately as _}, +}; + +#[tokio::test] +async fn includes_historical_message() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let sender = fixtures::login::create(&app).await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; + + // Call the endpoint + + let subscriber_creds = fixtures::login::create_with_password(&app).await; + let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await; + let routes::Events(events) = routes::events(State(app), subscriber, None, Query::default()) + .await + .expect("subscribe never fails"); + + // Verify the structure of the response. + + let event = events + .filter(fixtures::filter::messages()) + .next() + .immediately() + .await + .expect("delivered stored message"); + + assert!(fixtures::event::message_sent(&event, &message)); +} + +#[tokio::test] +async fn includes_live_message() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; + + // Call the endpoint + + let subscriber_creds = fixtures::login::create_with_password(&app).await; + let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await; + let routes::Events(events) = + routes::events(State(app.clone()), subscriber, None, Query::default()) + .await + .expect("subscribe never fails"); + + // Verify the semantics + + let sender = fixtures::login::create(&app).await; + let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; + + let event = events + .filter(fixtures::filter::messages()) + .next() + .immediately() + .await + .expect("delivered live message"); + + assert!(fixtures::event::message_sent(&event, &message)); +} + +#[tokio::test] +async fn includes_multiple_channels() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let sender = fixtures::login::create(&app).await; + + let channels = [ + fixtures::channel::create(&app, &fixtures::now()).await, + fixtures::channel::create(&app, &fixtures::now()).await, + ]; + + let messages = stream::iter(channels) + .then(|channel| { + let app = app.clone(); + let sender = sender.clone(); + let channel = channel.clone(); + async move { fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await } + }) + .collect::<Vec<_>>() + .await; + + // Call the endpoint + + let subscriber_creds = fixtures::login::create_with_password(&app).await; + let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await; + let routes::Events(events) = routes::events(State(app), subscriber, None, Query::default()) + .await + .expect("subscribe never fails"); + + // Verify the structure of the response. + + let events = events + .filter(fixtures::filter::messages()) + .take(messages.len()) + .collect::<Vec<_>>() + .immediately() + .await; + + for message in &messages { + assert!(events + .iter() + .any(|event| fixtures::event::message_sent(event, message))); + } +} + +#[tokio::test] +async fn sequential_messages() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let sender = fixtures::login::create(&app).await; + + let messages = vec![ + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + ]; + + // Call the endpoint + + let subscriber_creds = fixtures::login::create_with_password(&app).await; + let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await; + let routes::Events(events) = routes::events(State(app), subscriber, None, Query::default()) + .await + .expect("subscribe never fails"); + + // Verify the structure of the response. + + let mut events = events.filter(|event| { + future::ready( + messages + .iter() + .any(|message| fixtures::event::message_sent(event, message)), + ) + }); + + // Verify delivery in order + for message in &messages { + let event = events + .next() + .immediately() + .await + .expect("undelivered messages remaining"); + + assert!(fixtures::event::message_sent(&event, message)); + } +} + +#[tokio::test] +async fn resumes_from() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let sender = fixtures::login::create(&app).await; + + let initial_message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; + + let later_messages = vec![ + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + ]; + + // Call the endpoint + + let subscriber_creds = fixtures::login::create_with_password(&app).await; + let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await; + + let resume_at = { + // First subscription + let routes::Events(events) = routes::events( + State(app.clone()), + subscriber.clone(), + None, + Query::default(), + ) + .await + .expect("subscribe never fails"); + + let event = events + .filter(fixtures::filter::messages()) + .next() + .immediately() + .await + .expect("delivered events"); + + assert!(fixtures::event::message_sent(&event, &initial_message)); + + event.sequence() + }; + + // Resume after disconnect + let routes::Events(resumed) = routes::events( + State(app), + subscriber, + Some(resume_at.into()), + Query::default(), + ) + .await + .expect("subscribe never fails"); + + // Verify the structure of the response. + + let events = resumed + .take(later_messages.len()) + .collect::<Vec<_>>() + .immediately() + .await; + + for message in &later_messages { + assert!(events + .iter() + .any(|event| fixtures::event::message_sent(event, message))); + } +} + +// This test verifies a real bug I hit developing the vector-of-sequences +// approach to resuming events. A small omission caused the event IDs in a +// resumed stream to _omit_ channels that were in the original stream until +// those channels also appeared in the resumed stream. +// +// Clients would see something like +// * In the original stream, Cfoo=5,Cbar=8 +// * In the resumed stream, Cfoo=6 (no Cbar sequence number) +// +// Disconnecting and reconnecting a second time, using event IDs from that +// initial period of the first resume attempt, would then cause the second +// resume attempt to restart all other channels from the beginning, and not +// from where the first disconnection happened. +// +// This is a real and valid behaviour for clients! +#[tokio::test] +async fn serial_resume() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let sender = fixtures::login::create(&app).await; + let channel_a = fixtures::channel::create(&app, &fixtures::now()).await; + let channel_b = fixtures::channel::create(&app, &fixtures::now()).await; + + // Call the endpoint + + let subscriber_creds = fixtures::login::create_with_password(&app).await; + let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await; + + let resume_at = { + let initial_messages = [ + fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel_b, &sender, &fixtures::now()).await, + ]; + + // First subscription + let routes::Events(events) = routes::events( + State(app.clone()), + subscriber.clone(), + None, + Query::default(), + ) + .await + .expect("subscribe never fails"); + + let events = events + .filter(fixtures::filter::messages()) + .take(initial_messages.len()) + .collect::<Vec<_>>() + .immediately() + .await; + + for message in &initial_messages { + assert!(events + .iter() + .any(|event| fixtures::event::message_sent(event, message))); + } + + let event = events.last().expect("this vec is non-empty"); + + event.sequence() + }; + + // Resume after disconnect + let resume_at = { + let resume_messages = [ + // Note that channel_b does not appear here. The buggy behaviour + // would be masked if channel_b happened to send a new message + // into the resumed event stream. + fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await, + ]; + + // Second subscription + let routes::Events(events) = routes::events( + State(app.clone()), + subscriber.clone(), + Some(resume_at.into()), + Query::default(), + ) + .await + .expect("subscribe never fails"); + + let events = events + .filter(fixtures::filter::messages()) + .take(resume_messages.len()) + .collect::<Vec<_>>() + .immediately() + .await; + + for message in &resume_messages { + assert!(events + .iter() + .any(|event| fixtures::event::message_sent(event, message))); + } + + let event = events.last().expect("this vec is non-empty"); + + event.sequence() + }; + + // Resume after disconnect a second time + { + // At this point, we can send on either channel and demonstrate the + // problem. The resume point should before both of these messages, but + // after _all_ prior messages. + let final_messages = [ + fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel_b, &sender, &fixtures::now()).await, + ]; + + // Third subscription + let routes::Events(events) = routes::events( + State(app.clone()), + subscriber.clone(), + Some(resume_at.into()), + Query::default(), + ) + .await + .expect("subscribe never fails"); + + let events = events + .filter(fixtures::filter::messages()) + .take(final_messages.len()) + .collect::<Vec<_>>() + .immediately() + .await; + + // This set of messages, in particular, _should not_ include any prior + // messages from `initial_messages` or `resume_messages`. + for message in &final_messages { + assert!(events + .iter() + .any(|event| fixtures::event::message_sent(event, message))); + } + }; +} + +#[tokio::test] +async fn terminates_on_token_expiry() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let sender = fixtures::login::create(&app).await; + + // Subscribe via the endpoint + + let subscriber_creds = fixtures::login::create_with_password(&app).await; + let subscriber = + fixtures::identity::identity(&app, &subscriber_creds, &fixtures::ancient()).await; + + let routes::Events(events) = + routes::events(State(app.clone()), subscriber, None, Query::default()) + .await + .expect("subscribe never fails"); + + // Verify the resulting stream's behaviour + + app.tokens() + .expire(&fixtures::now()) + .await + .expect("expiring tokens succeeds"); + + // These should not be delivered. + let messages = [ + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + ]; + + assert!(events + .filter(|event| future::ready( + messages + .iter() + .any(|message| fixtures::event::message_sent(event, message)) + )) + .next() + .immediately() + .await + .is_none()); +} + +#[tokio::test] +async fn terminates_on_logout() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let sender = fixtures::login::create(&app).await; + + // Subscribe via the endpoint + + let subscriber_creds = fixtures::login::create_with_password(&app).await; + let subscriber_token = + fixtures::identity::logged_in(&app, &subscriber_creds, &fixtures::now()).await; + let subscriber = + fixtures::identity::from_token(&app, &subscriber_token, &fixtures::now()).await; + + let routes::Events(events) = routes::events( + State(app.clone()), + subscriber.clone(), + None, + Query::default(), + ) + .await + .expect("subscribe never fails"); + + // Verify the resulting stream's behaviour + + app.tokens() + .logout(&subscriber.token) + .await + .expect("expiring tokens succeeds"); + + // These should not be delivered. + let messages = [ + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + ]; + + assert!(events + .filter(|event| future::ready( + messages + .iter() + .any(|message| fixtures::event::message_sent(event, message)) + )) + .next() + .immediately() + .await + .is_none()); +} diff --git a/src/event/sequence.rs b/src/event/sequence.rs new file mode 100644 index 0000000..fbe3711 --- /dev/null +++ b/src/event/sequence.rs @@ -0,0 +1,90 @@ +use std::fmt; + +use crate::clock::DateTime; + +#[derive(Clone, Copy, Debug, Eq, PartialEq, serde::Serialize)] +pub struct Instant { + pub at: DateTime, + #[serde(skip)] + pub sequence: Sequence, +} + +impl From<Instant> for Sequence { + fn from(instant: Instant) -> Self { + instant.sequence + } +} + +#[derive( + Clone, + Copy, + Debug, + Eq, + Ord, + PartialEq, + PartialOrd, + serde::Deserialize, + serde::Serialize, + sqlx::Type, +)] +#[serde(transparent)] +#[sqlx(transparent)] +pub struct Sequence(i64); + +impl fmt::Display for Sequence { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let Self(value) = self; + value.fmt(f) + } +} + +impl Sequence { + pub fn up_to<E>(resume_point: Option<Self>) -> impl for<'e> Fn(&'e E) -> bool + where + E: Sequenced, + { + move |event| resume_point.map_or(true, |resume_point| event.sequence() <= resume_point) + } + + pub fn after<E>(resume_point: Option<Self>) -> impl for<'e> Fn(&'e E) -> bool + where + E: Sequenced, + { + move |event| resume_point < Some(event.sequence()) + } + + pub fn start_from<E>(resume_point: Self) -> impl for<'e> Fn(&'e E) -> bool + where + E: Sequenced, + { + move |event| resume_point <= event.sequence() + } + + pub fn merge<E>(a: &E, b: &E) -> bool + where + E: Sequenced, + { + a.sequence() < b.sequence() + } +} + +pub trait Sequenced { + fn instant(&self) -> Instant; + + fn sequence(&self) -> Sequence { + self.instant().into() + } +} + +impl<E> Sequenced for &E +where + E: Sequenced, +{ + fn instant(&self) -> Instant { + (*self).instant() + } + + fn sequence(&self) -> Sequence { + (*self).sequence() + } +} |
