summaryrefslogtreecommitdiff
path: root/src/event
diff options
context:
space:
mode:
Diffstat (limited to 'src/event')
-rw-r--r--src/event/app.rs122
-rw-r--r--src/event/broadcaster.rs4
-rw-r--r--src/event/mod.rs73
-rw-r--r--src/event/repo.rs (renamed from src/event/repo/sequence.rs)0
-rw-r--r--src/event/repo/message.rs196
-rw-r--r--src/event/repo/mod.rs4
-rw-r--r--src/event/routes.rs14
-rw-r--r--src/event/routes/test.rs98
-rw-r--r--src/event/sequence.rs59
-rw-r--r--src/event/types.rs85
10 files changed, 222 insertions, 433 deletions
diff --git a/src/event/app.rs b/src/event/app.rs
index 5e9e79a..e58bea9 100644
--- a/src/event/app.rs
+++ b/src/event/app.rs
@@ -1,22 +1,15 @@
-use chrono::TimeDelta;
use futures::{
future,
stream::{self, StreamExt as _},
Stream,
};
+use itertools::Itertools as _;
use sqlx::sqlite::SqlitePool;
-use super::{
- broadcaster::Broadcaster,
- repo::message::Provider as _,
- types::{self, ChannelEvent},
-};
+use super::{broadcaster::Broadcaster, Event, Sequence, Sequenced};
use crate::{
channel::{self, repo::Provider as _},
- clock::DateTime,
- db::NotFound as _,
- event::{repo::Provider as _, Sequence},
- login::Login,
+ message::{self, repo::Provider as _},
};
pub struct Events<'a> {
@@ -29,111 +22,52 @@ impl<'a> Events<'a> {
Self { db, events }
}
- pub async fn send(
- &self,
- login: &Login,
- channel: &channel::Id,
- body: &str,
- sent_at: &DateTime,
- ) -> Result<types::ChannelEvent, EventsError> {
- let mut tx = self.db.begin().await?;
- let channel = tx
- .channels()
- .by_id(channel)
- .await
- .not_found(|| EventsError::ChannelNotFound(channel.clone()))?;
- let sent = tx.sequence().next(sent_at).await?;
- let event = tx
- .message_events()
- .create(login, &channel, &sent, body)
- .await?;
- tx.commit().await?;
-
- self.events.broadcast(&event);
- Ok(event)
- }
-
- pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> {
- // Somewhat arbitrarily, expire after 90 days.
- let expire_at = relative_to.to_owned() - TimeDelta::days(90);
-
- let mut tx = self.db.begin().await?;
- let expired = tx.message_events().expired(&expire_at).await?;
-
- let mut events = Vec::with_capacity(expired.len());
- for (channel, message) in expired {
- let deleted = tx.sequence().next(relative_to).await?;
- let event = tx
- .message_events()
- .delete(&channel, &message, &deleted)
- .await?;
- events.push(event);
- }
-
- tx.commit().await?;
-
- for event in events {
- self.events.broadcast(&event);
- }
-
- Ok(())
- }
-
pub async fn subscribe(
&self,
resume_at: Option<Sequence>,
- ) -> Result<impl Stream<Item = types::ChannelEvent> + std::fmt::Debug, sqlx::Error> {
+ ) -> Result<impl Stream<Item = Event> + std::fmt::Debug, sqlx::Error> {
// Subscribe before retrieving, to catch messages broadcast while we're
// querying the DB. We'll prune out duplicates later.
let live_messages = self.events.subscribe();
let mut tx = self.db.begin().await?;
- let channels = tx.channels().replay(resume_at).await?;
+ let channels = tx.channels().replay(resume_at).await?;
let channel_events = channels
- .into_iter()
- .map(ChannelEvent::created)
- .filter(move |event| {
- resume_at.map_or(true, |resume_at| Sequence::from(event) > resume_at)
- });
-
- let message_events = tx.message_events().replay(resume_at).await?;
-
- let mut replay_events = channel_events
- .into_iter()
- .chain(message_events.into_iter())
+ .iter()
+ .map(channel::History::events)
+ .kmerge_by(|a, b| a.instant.sequence < b.instant.sequence)
+ .filter(Sequence::after(resume_at))
+ .map(Event::from);
+
+ let messages = tx.messages().replay(resume_at).await?;
+ let message_events = messages
+ .iter()
+ .map(message::History::events)
+ .kmerge_by(|a, b| a.instant.sequence < b.instant.sequence)
+ .filter(Sequence::after(resume_at))
+ .map(Event::from);
+
+ let replay_events = channel_events
+ .merge_by(message_events, |a, b| {
+ a.instant.sequence < b.instant.sequence
+ })
.collect::<Vec<_>>();
- replay_events.sort_by_key(|event| Sequence::from(event));
- let resume_live_at = replay_events.last().map(Sequence::from);
+ let resume_live_at = replay_events.last().map(Sequenced::sequence);
let replay = stream::iter(replay_events);
- // no skip_expired or resume transforms for stored_messages, as it's
- // constructed not to contain messages meeting either criterion.
- //
- // * skip_expired is redundant with the `tx.broadcasts().expire(…)` call;
- // * resume is redundant with the resume_at argument to
- // `tx.broadcasts().replay(…)`.
let live_messages = live_messages
// Filtering on the broadcast resume point filters out messages
// before resume_at, and filters out messages duplicated from
- // stored_messages.
+ // `replay_events`.
.filter(Self::resume(resume_live_at));
Ok(replay.chain(live_messages))
}
- fn resume(
- resume_at: Option<Sequence>,
- ) -> impl for<'m> FnMut(&'m types::ChannelEvent) -> future::Ready<bool> {
- move |event| future::ready(resume_at < Some(Sequence::from(event)))
+ fn resume(resume_at: Option<Sequence>) -> impl for<'m> FnMut(&'m Event) -> future::Ready<bool> {
+ let filter = Sequence::after(resume_at);
+ move |event| future::ready(filter(event))
}
}
-
-#[derive(Debug, thiserror::Error)]
-pub enum EventsError {
- #[error("channel {0} not found")]
- ChannelNotFound(channel::Id),
- #[error(transparent)]
- DatabaseError(#[from] sqlx::Error),
-}
diff --git a/src/event/broadcaster.rs b/src/event/broadcaster.rs
index 92f631f..de2513a 100644
--- a/src/event/broadcaster.rs
+++ b/src/event/broadcaster.rs
@@ -1,3 +1,3 @@
-use crate::{broadcast, event::types};
+use crate::broadcast;
-pub type Broadcaster = broadcast::Broadcaster<types::ChannelEvent>;
+pub type Broadcaster = broadcast::Broadcaster<super::Event>;
diff --git a/src/event/mod.rs b/src/event/mod.rs
index c982d3a..1503b77 100644
--- a/src/event/mod.rs
+++ b/src/event/mod.rs
@@ -1,18 +1,75 @@
+use crate::{channel, message};
+
pub mod app;
pub mod broadcaster;
mod extract;
pub mod repo;
mod routes;
mod sequence;
-pub mod types;
-use crate::clock::DateTime;
+pub use self::{
+ routes::router,
+ sequence::{Instant, Sequence, Sequenced},
+};
+
+#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
+pub struct Event {
+ #[serde(flatten)]
+ pub instant: Instant,
+ #[serde(flatten)]
+ pub kind: Kind,
+}
+
+impl Sequenced for Event {
+ fn instant(&self) -> Instant {
+ self.instant
+ }
+}
+
+impl From<channel::Event> for Event {
+ fn from(event: channel::Event) -> Self {
+ Self {
+ instant: event.instant,
+ kind: event.kind.into(),
+ }
+ }
+}
+
+impl From<message::Event> for Event {
+ fn from(event: message::Event) -> Self {
+ Self {
+ instant: event.instant,
+ kind: event.kind.into(),
+ }
+ }
+}
-pub use self::{routes::router, sequence::Sequence};
+#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
+#[serde(tag = "type", rename_all = "snake_case")]
+pub enum Kind {
+ #[serde(rename = "created")]
+ ChannelCreated(channel::event::Created),
+ #[serde(rename = "message")]
+ MessageSent(message::event::Sent),
+ MessageDeleted(message::event::Deleted),
+ #[serde(rename = "deleted")]
+ ChannelDeleted(channel::event::Deleted),
+}
+
+impl From<channel::event::Kind> for Kind {
+ fn from(kind: channel::event::Kind) -> Self {
+ match kind {
+ channel::event::Kind::Created(created) => Self::ChannelCreated(created),
+ channel::event::Kind::Deleted(deleted) => Self::ChannelDeleted(deleted),
+ }
+ }
+}
-#[derive(Clone, Copy, Debug, Eq, PartialEq, serde::Serialize)]
-pub struct Instant {
- pub at: DateTime,
- #[serde(skip)]
- pub sequence: Sequence,
+impl From<message::event::Kind> for Kind {
+ fn from(kind: message::event::Kind) -> Self {
+ match kind {
+ message::event::Kind::Sent(created) => Self::MessageSent(created),
+ message::event::Kind::Deleted(deleted) => Self::MessageDeleted(deleted),
+ }
+ }
}
diff --git a/src/event/repo/sequence.rs b/src/event/repo.rs
index 40d6a53..40d6a53 100644
--- a/src/event/repo/sequence.rs
+++ b/src/event/repo.rs
diff --git a/src/event/repo/message.rs b/src/event/repo/message.rs
deleted file mode 100644
index f29c8a4..0000000
--- a/src/event/repo/message.rs
+++ /dev/null
@@ -1,196 +0,0 @@
-use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction};
-
-use crate::{
- channel::{self, Channel},
- clock::DateTime,
- event::{types, Instant, Sequence},
- login::{self, Login},
- message::{self, Message},
-};
-
-pub trait Provider {
- fn message_events(&mut self) -> Events;
-}
-
-impl<'c> Provider for Transaction<'c, Sqlite> {
- fn message_events(&mut self) -> Events {
- Events(self)
- }
-}
-
-pub struct Events<'t>(&'t mut SqliteConnection);
-
-impl<'c> Events<'c> {
- pub async fn create(
- &mut self,
- sender: &Login,
- channel: &Channel,
- sent: &Instant,
- body: &str,
- ) -> Result<types::ChannelEvent, sqlx::Error> {
- let id = message::Id::generate();
-
- let message = sqlx::query!(
- r#"
- insert into message
- (id, channel, sender, sent_at, sent_sequence, body)
- values ($1, $2, $3, $4, $5, $6)
- returning
- id as "id: message::Id",
- sender as "sender: login::Id",
- sent_at as "sent_at: DateTime",
- sent_sequence as "sent_sequence: Sequence",
- body
- "#,
- id,
- channel.id,
- sender.id,
- sent.at,
- sent.sequence,
- body,
- )
- .map(|row| types::ChannelEvent {
- instant: Instant {
- at: row.sent_at,
- sequence: row.sent_sequence,
- },
- data: types::MessageEvent {
- channel: channel.clone(),
- sender: sender.clone(),
- message: Message {
- id: row.id,
- body: row.body,
- },
- }
- .into(),
- })
- .fetch_one(&mut *self.0)
- .await?;
-
- Ok(message)
- }
-
- pub async fn delete(
- &mut self,
- channel: &Channel,
- message: &message::Id,
- deleted: &Instant,
- ) -> Result<types::ChannelEvent, sqlx::Error> {
- sqlx::query_scalar!(
- r#"
- delete from message
- where id = $1
- returning 1 as "row: i64"
- "#,
- message,
- )
- .fetch_one(&mut *self.0)
- .await?;
-
- Ok(types::ChannelEvent {
- instant: Instant {
- at: deleted.at,
- sequence: deleted.sequence,
- },
- data: types::MessageDeletedEvent {
- channel: channel.clone(),
- message: message.clone(),
- }
- .into(),
- })
- }
-
- pub async fn expired(
- &mut self,
- expire_at: &DateTime,
- ) -> Result<Vec<(Channel, message::Id)>, sqlx::Error> {
- let messages = sqlx::query!(
- r#"
- select
- channel.id as "channel_id: channel::Id",
- channel.name as "channel_name",
- channel.created_at as "channel_created_at: DateTime",
- channel.created_sequence as "channel_created_sequence: Sequence",
- message.id as "message: message::Id"
- from message
- join channel on message.channel = channel.id
- join login as sender on message.sender = sender.id
- where sent_at < $1
- "#,
- expire_at,
- )
- .map(|row| {
- (
- Channel {
- id: row.channel_id,
- name: row.channel_name,
- created: Instant {
- at: row.channel_created_at,
- sequence: row.channel_created_sequence,
- },
- },
- row.message,
- )
- })
- .fetch_all(&mut *self.0)
- .await?;
-
- Ok(messages)
- }
-
- pub async fn replay(
- &mut self,
- resume_at: Option<Sequence>,
- ) -> Result<Vec<types::ChannelEvent>, sqlx::Error> {
- let events = sqlx::query!(
- r#"
- select
- message.id as "id: message::Id",
- channel.id as "channel_id: channel::Id",
- channel.name as "channel_name",
- channel.created_at as "channel_created_at: DateTime",
- channel.created_sequence as "channel_created_sequence: Sequence",
- sender.id as "sender_id: login::Id",
- sender.name as sender_name,
- message.sent_at as "sent_at: DateTime",
- message.sent_sequence as "sent_sequence: Sequence",
- message.body
- from message
- join channel on message.channel = channel.id
- join login as sender on message.sender = sender.id
- where coalesce(message.sent_sequence > $1, true)
- order by sent_sequence asc
- "#,
- resume_at,
- )
- .map(|row| types::ChannelEvent {
- instant: Instant {
- at: row.sent_at,
- sequence: row.sent_sequence,
- },
- data: types::MessageEvent {
- channel: Channel {
- id: row.channel_id,
- name: row.channel_name,
- created: Instant {
- at: row.channel_created_at,
- sequence: row.channel_created_sequence,
- },
- },
- sender: Login {
- id: row.sender_id,
- name: row.sender_name,
- },
- message: Message {
- id: row.id,
- body: row.body,
- },
- }
- .into(),
- })
- .fetch_all(&mut *self.0)
- .await?;
-
- Ok(events)
- }
-}
diff --git a/src/event/repo/mod.rs b/src/event/repo/mod.rs
deleted file mode 100644
index cee840c..0000000
--- a/src/event/repo/mod.rs
+++ /dev/null
@@ -1,4 +0,0 @@
-pub mod message;
-mod sequence;
-
-pub use self::sequence::Provider;
diff --git a/src/event/routes.rs b/src/event/routes.rs
index c87bfb2..5b9c7e3 100644
--- a/src/event/routes.rs
+++ b/src/event/routes.rs
@@ -10,11 +10,11 @@ use axum::{
use axum_extra::extract::Query;
use futures::stream::{Stream, StreamExt as _};
-use super::{extract::LastEventId, types};
+use super::{extract::LastEventId, Event};
use crate::{
app::App,
error::{Internal, Unauthorized},
- event::Sequence,
+ event::{Sequence, Sequenced as _},
token::{app::ValidateError, extract::Identity},
};
@@ -35,7 +35,7 @@ async fn events(
identity: Identity,
last_event_id: Option<LastEventId<Sequence>>,
Query(query): Query<EventsQuery>,
-) -> Result<Events<impl Stream<Item = types::ChannelEvent> + std::fmt::Debug>, EventsError> {
+) -> Result<Events<impl Stream<Item = Event> + std::fmt::Debug>, EventsError> {
let resume_at = last_event_id
.map(LastEventId::into_inner)
.or(query.resume_point);
@@ -51,7 +51,7 @@ struct Events<S>(S);
impl<S> IntoResponse for Events<S>
where
- S: Stream<Item = types::ChannelEvent> + Send + 'static,
+ S: Stream<Item = Event> + Send + 'static,
{
fn into_response(self) -> Response {
let Self(stream) = self;
@@ -62,11 +62,11 @@ where
}
}
-impl TryFrom<types::ChannelEvent> for sse::Event {
+impl TryFrom<Event> for sse::Event {
type Error = serde_json::Error;
- fn try_from(event: types::ChannelEvent) -> Result<Self, Self::Error> {
- let id = serde_json::to_string(&Sequence::from(&event))?;
+ fn try_from(event: Event) -> Result<Self, Self::Error> {
+ let id = serde_json::to_string(&event.sequence())?;
let data = serde_json::to_string_pretty(&event)?;
let event = Self::default().id(id).data(data);
diff --git a/src/event/routes/test.rs b/src/event/routes/test.rs
index 68b55cc..ba9953e 100644
--- a/src/event/routes/test.rs
+++ b/src/event/routes/test.rs
@@ -6,7 +6,7 @@ use futures::{
};
use crate::{
- event::{routes, Sequence},
+ event::{routes, Sequenced as _},
test::fixtures::{self, future::Immediately as _},
};
@@ -17,7 +17,7 @@ async fn includes_historical_message() {
let app = fixtures::scratch_app().await;
let sender = fixtures::login::create(&app).await;
let channel = fixtures::channel::create(&app, &fixtures::now()).await;
- let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await;
+ let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await;
// Call the endpoint
@@ -36,7 +36,7 @@ async fn includes_historical_message() {
.await
.expect("delivered stored message");
- assert_eq!(message, event);
+ assert!(fixtures::event::message_sent(&event, &message));
}
#[tokio::test]
@@ -58,7 +58,7 @@ async fn includes_live_message() {
// Verify the semantics
let sender = fixtures::login::create(&app).await;
- let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await;
+ let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await;
let event = events
.filter(fixtures::filter::messages())
@@ -67,7 +67,7 @@ async fn includes_live_message() {
.await
.expect("delivered live message");
- assert_eq!(message, event);
+ assert!(fixtures::event::message_sent(&event, &message));
}
#[tokio::test]
@@ -87,7 +87,7 @@ async fn includes_multiple_channels() {
let app = app.clone();
let sender = sender.clone();
let channel = channel.clone();
- async move { fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await }
+ async move { fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await }
})
.collect::<Vec<_>>()
.await;
@@ -110,7 +110,9 @@ async fn includes_multiple_channels() {
.await;
for message in &messages {
- assert!(events.iter().any(|event| { event == message }));
+ assert!(events
+ .iter()
+ .any(|event| fixtures::event::message_sent(event, message)));
}
}
@@ -123,9 +125,9 @@ async fn sequential_messages() {
let sender = fixtures::login::create(&app).await;
let messages = vec![
- fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await,
- fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await,
- fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
];
// Call the endpoint
@@ -138,7 +140,13 @@ async fn sequential_messages() {
// Verify the structure of the response.
- let mut events = events.filter(|event| future::ready(messages.contains(event)));
+ let mut events = events.filter(|event| {
+ future::ready(
+ messages
+ .iter()
+ .any(|message| fixtures::event::message_sent(event, message)),
+ )
+ });
// Verify delivery in order
for message in &messages {
@@ -148,7 +156,7 @@ async fn sequential_messages() {
.await
.expect("undelivered messages remaining");
- assert_eq!(message, &event);
+ assert!(fixtures::event::message_sent(&event, message));
}
}
@@ -160,11 +168,11 @@ async fn resumes_from() {
let channel = fixtures::channel::create(&app, &fixtures::now()).await;
let sender = fixtures::login::create(&app).await;
- let initial_message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await;
+ let initial_message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await;
let later_messages = vec![
- fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await,
- fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
];
// Call the endpoint
@@ -190,9 +198,9 @@ async fn resumes_from() {
.await
.expect("delivered events");
- assert_eq!(initial_message, event);
+ assert!(fixtures::event::message_sent(&event, &initial_message));
- Sequence::from(&event)
+ event.sequence()
};
// Resume after disconnect
@@ -214,7 +222,9 @@ async fn resumes_from() {
.await;
for message in &later_messages {
- assert!(events.iter().any(|event| event == message));
+ assert!(events
+ .iter()
+ .any(|event| fixtures::event::message_sent(event, message)));
}
}
@@ -249,8 +259,8 @@ async fn serial_resume() {
let resume_at = {
let initial_messages = [
- fixtures::message::send(&app, &sender, &channel_a, &fixtures::now()).await,
- fixtures::message::send(&app, &sender, &channel_b, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel_b, &sender, &fixtures::now()).await,
];
// First subscription
@@ -271,12 +281,14 @@ async fn serial_resume() {
.await;
for message in &initial_messages {
- assert!(events.iter().any(|event| event == message));
+ assert!(events
+ .iter()
+ .any(|event| fixtures::event::message_sent(event, message)));
}
let event = events.last().expect("this vec is non-empty");
- Sequence::from(event)
+ event.sequence()
};
// Resume after disconnect
@@ -285,8 +297,8 @@ async fn serial_resume() {
// Note that channel_b does not appear here. The buggy behaviour
// would be masked if channel_b happened to send a new message
// into the resumed event stream.
- fixtures::message::send(&app, &sender, &channel_a, &fixtures::now()).await,
- fixtures::message::send(&app, &sender, &channel_a, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await,
];
// Second subscription
@@ -307,12 +319,14 @@ async fn serial_resume() {
.await;
for message in &resume_messages {
- assert!(events.iter().any(|event| event == message));
+ assert!(events
+ .iter()
+ .any(|event| fixtures::event::message_sent(event, message)));
}
let event = events.last().expect("this vec is non-empty");
- Sequence::from(event)
+ event.sequence()
};
// Resume after disconnect a second time
@@ -321,8 +335,8 @@ async fn serial_resume() {
// problem. The resume point should before both of these messages, but
// after _all_ prior messages.
let final_messages = [
- fixtures::message::send(&app, &sender, &channel_a, &fixtures::now()).await,
- fixtures::message::send(&app, &sender, &channel_b, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel_b, &sender, &fixtures::now()).await,
];
// Third subscription
@@ -345,7 +359,9 @@ async fn serial_resume() {
// This set of messages, in particular, _should not_ include any prior
// messages from `initial_messages` or `resume_messages`.
for message in &final_messages {
- assert!(events.iter().any(|event| event == message));
+ assert!(events
+ .iter()
+ .any(|event| fixtures::event::message_sent(event, message)));
}
};
}
@@ -378,13 +394,17 @@ async fn terminates_on_token_expiry() {
// These should not be delivered.
let messages = [
- fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await,
- fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await,
- fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
];
assert!(events
- .filter(|event| future::ready(messages.contains(event)))
+ .filter(|event| future::ready(
+ messages
+ .iter()
+ .any(|message| fixtures::event::message_sent(event, message))
+ ))
.next()
.immediately()
.await
@@ -425,13 +445,17 @@ async fn terminates_on_logout() {
// These should not be delivered.
let messages = [
- fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await,
- fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await,
- fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
];
assert!(events
- .filter(|event| future::ready(messages.contains(event)))
+ .filter(|event| future::ready(
+ messages
+ .iter()
+ .any(|message| fixtures::event::message_sent(event, message))
+ ))
.next()
.immediately()
.await
diff --git a/src/event/sequence.rs b/src/event/sequence.rs
index 9ebddd7..c566156 100644
--- a/src/event/sequence.rs
+++ b/src/event/sequence.rs
@@ -1,5 +1,20 @@
use std::fmt;
+use crate::clock::DateTime;
+
+#[derive(Clone, Copy, Debug, Eq, PartialEq, serde::Serialize)]
+pub struct Instant {
+ pub at: DateTime,
+ #[serde(skip)]
+ pub sequence: Sequence,
+}
+
+impl From<Instant> for Sequence {
+ fn from(instant: Instant) -> Self {
+ instant.sequence
+ }
+}
+
#[derive(
Clone,
Copy,
@@ -22,3 +37,47 @@ impl fmt::Display for Sequence {
value.fmt(f)
}
}
+
+impl Sequence {
+ pub fn up_to<E>(resume_point: Option<Self>) -> impl for<'e> Fn(&'e E) -> bool
+ where
+ E: Sequenced,
+ {
+ move |event| resume_point.map_or(true, |resume_point| event.sequence() <= resume_point)
+ }
+
+ pub fn after<E>(resume_point: Option<Self>) -> impl for<'e> Fn(&'e E) -> bool
+ where
+ E: Sequenced,
+ {
+ move |event| resume_point < Some(event.sequence())
+ }
+
+ pub fn start_from<E>(resume_point: Self) -> impl for<'e> Fn(&'e E) -> bool
+ where
+ E: Sequenced,
+ {
+ move |event| resume_point <= event.sequence()
+ }
+}
+
+pub trait Sequenced {
+ fn instant(&self) -> Instant;
+
+ fn sequence(&self) -> Sequence {
+ self.instant().into()
+ }
+}
+
+impl<E> Sequenced for &E
+where
+ E: Sequenced,
+{
+ fn instant(&self) -> Instant {
+ (*self).instant()
+ }
+
+ fn sequence(&self) -> Sequence {
+ (*self).sequence()
+ }
+}
diff --git a/src/event/types.rs b/src/event/types.rs
deleted file mode 100644
index 2324dc1..0000000
--- a/src/event/types.rs
+++ /dev/null
@@ -1,85 +0,0 @@
-use crate::{
- channel::{self, Channel},
- event::{Instant, Sequence},
- login::Login,
- message::{self, Message},
-};
-
-#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
-pub struct ChannelEvent {
- #[serde(flatten)]
- pub instant: Instant,
- #[serde(flatten)]
- pub data: ChannelEventData,
-}
-
-impl ChannelEvent {
- pub fn created(channel: Channel) -> Self {
- Self {
- instant: channel.created,
- data: CreatedEvent { channel }.into(),
- }
- }
-}
-
-impl<'c> From<&'c ChannelEvent> for Sequence {
- fn from(event: &'c ChannelEvent) -> Self {
- event.instant.sequence
- }
-}
-
-#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
-#[serde(tag = "type", rename_all = "snake_case")]
-pub enum ChannelEventData {
- Created(CreatedEvent),
- Message(MessageEvent),
- MessageDeleted(MessageDeletedEvent),
- Deleted(DeletedEvent),
-}
-
-#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
-pub struct CreatedEvent {
- pub channel: Channel,
-}
-
-impl From<CreatedEvent> for ChannelEventData {
- fn from(event: CreatedEvent) -> Self {
- Self::Created(event)
- }
-}
-
-#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
-pub struct MessageEvent {
- pub channel: Channel,
- pub sender: Login,
- pub message: Message,
-}
-
-impl From<MessageEvent> for ChannelEventData {
- fn from(event: MessageEvent) -> Self {
- Self::Message(event)
- }
-}
-
-#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
-pub struct MessageDeletedEvent {
- pub channel: Channel,
- pub message: message::Id,
-}
-
-impl From<MessageDeletedEvent> for ChannelEventData {
- fn from(event: MessageDeletedEvent) -> Self {
- Self::MessageDeleted(event)
- }
-}
-
-#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
-pub struct DeletedEvent {
- pub channel: channel::Id,
-}
-
-impl From<DeletedEvent> for ChannelEventData {
- fn from(event: DeletedEvent) -> Self {
- Self::Deleted(event)
- }
-}