diff options
Diffstat (limited to 'src/events')
| -rw-r--r-- | src/events/app.rs | 24 | ||||
| -rw-r--r-- | src/events/routes/test.rs | 18 | ||||
| -rw-r--r-- | src/events/types.rs | 40 |
3 files changed, 65 insertions, 17 deletions
diff --git a/src/events/app.rs b/src/events/app.rs index 043a29b..134e86a 100644 --- a/src/events/app.rs +++ b/src/events/app.rs @@ -11,7 +11,7 @@ use sqlx::sqlite::SqlitePool; use super::{ broadcaster::Broadcaster, repo::message::Provider as _, - types::{self, ResumePoint}, + types::{self, ChannelEvent, ResumePoint}, }; use crate::{ clock::DateTime, @@ -66,6 +66,17 @@ impl<'a> Events<'a> { let mut tx = self.db.begin().await?; let channels = tx.channels().all().await?; + let created_events = { + let resume_at = resume_at.clone(); + let channels = channels.clone(); + stream::iter( + channels + .into_iter() + .map(ChannelEvent::created) + .filter(move |event| resume_at.not_after(event)), + ) + }; + // Subscribe before retrieving, to catch messages broadcast while we're // querying the DB. We'll prune out duplicates later. let live_messages = self.broadcaster.subscribe(); @@ -104,9 +115,9 @@ impl<'a> Events<'a> { // stored_messages. .filter(Self::resume(resume_live_at)); - Ok(replay - .chain(live_messages) - .scan(resume_at, |resume_point, event| { + Ok(created_events.chain(replay).chain(live_messages).scan( + resume_at, + |resume_point, event| { let channel = &event.channel.id; let sequence = event.sequence; resume_point.advance(channel, sequence); @@ -114,13 +125,14 @@ impl<'a> Events<'a> { let event = types::ResumableEvent(resume_point.clone(), event); future::ready(Some(event)) - })) + }, + )) } fn resume( resume_at: ResumePoint, ) -> impl for<'m> FnMut(&'m types::ChannelEvent) -> future::Ready<bool> { - move |event| future::ready(resume_at < event.sequence()) + move |event| future::ready(resume_at.not_after(event)) } fn skip_expired( expire_at: &DateTime, diff --git a/src/events/routes/test.rs b/src/events/routes/test.rs index f289225..55ada95 100644 --- a/src/events/routes/test.rs +++ b/src/events/routes/test.rs @@ -15,7 +15,7 @@ async fn includes_historical_message() { let app = fixtures::scratch_app().await; let sender = fixtures::login::create(&app).await; - let channel = fixtures::channel::create(&app).await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await; // Call the endpoint @@ -42,7 +42,7 @@ async fn includes_live_message() { // Set up the environment let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app).await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; // Call the endpoint @@ -75,8 +75,8 @@ async fn includes_multiple_channels() { let sender = fixtures::login::create(&app).await; let channels = [ - fixtures::channel::create(&app).await, - fixtures::channel::create(&app).await, + fixtures::channel::create(&app, &fixtures::now()).await, + fixtures::channel::create(&app, &fixtures::now()).await, ]; let messages = stream::iter(channels) @@ -117,7 +117,7 @@ async fn sequential_messages() { // Set up the environment let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app).await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; let sender = fixtures::login::create(&app).await; let messages = vec![ @@ -156,7 +156,7 @@ async fn resumes_from() { // Set up the environment let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app).await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; let sender = fixtures::login::create(&app).await; let initial_message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await; @@ -229,8 +229,8 @@ async fn serial_resume() { let app = fixtures::scratch_app().await; let sender = fixtures::login::create(&app).await; - let channel_a = fixtures::channel::create(&app).await; - let channel_b = fixtures::channel::create(&app).await; + let channel_a = fixtures::channel::create(&app, &fixtures::now()).await; + let channel_b = fixtures::channel::create(&app, &fixtures::now()).await; // Call the endpoint @@ -346,7 +346,7 @@ async fn removes_expired_messages() { // Set up the environment let app = fixtures::scratch_app().await; let sender = fixtures::login::create(&app).await; - let channel = fixtures::channel::create(&app).await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; fixtures::message::send(&app, &sender, &channel, &fixtures::ancient()).await; let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await; diff --git a/src/events/types.rs b/src/events/types.rs index 6747afc..7c0e0a4 100644 --- a/src/events/types.rs +++ b/src/events/types.rs @@ -11,6 +11,7 @@ use crate::{ #[derive( Debug, + Default, Eq, Ord, PartialEq, @@ -59,7 +60,30 @@ impl ResumePoint { let Self(elements) = self; elements.get(channel).copied() } + + pub fn not_after(&self, event: impl ResumeElement) -> bool { + let Self(elements) = self; + let (channel, sequence) = event.element(); + + elements + .get(channel) + .map_or(true, |resume_at| resume_at < &sequence) + } } + +pub trait ResumeElement { + fn element(&self) -> (&channel::Id, Sequence); +} + +impl<T> ResumeElement for &T +where + T: ResumeElement, +{ + fn element(&self) -> (&channel::Id, Sequence) { + (*self).element() + } +} + #[derive(Clone, Debug)] pub struct ResumableEvent(pub ResumePoint, pub ChannelEvent); @@ -74,14 +98,26 @@ pub struct ChannelEvent { } impl ChannelEvent { - pub fn sequence(&self) -> ResumePoint { - ResumePoint::singleton(&self.channel.id, self.sequence) + pub fn created(channel: Channel) -> Self { + Self { + at: channel.created_at, + sequence: Sequence::default(), + channel, + data: ChannelEventData::Created, + } + } +} + +impl ResumeElement for ChannelEvent { + fn element(&self) -> (&channel::Id, Sequence) { + (&self.channel.id, self.sequence) } } #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] #[serde(tag = "type", rename_all = "snake_case")] pub enum ChannelEventData { + Created, Message(MessageEvent), } |
