summaryrefslogtreecommitdiff
path: root/src/events
diff options
context:
space:
mode:
Diffstat (limited to 'src/events')
-rw-r--r--src/events/app.rs24
-rw-r--r--src/events/routes/test.rs18
-rw-r--r--src/events/types.rs40
3 files changed, 65 insertions, 17 deletions
diff --git a/src/events/app.rs b/src/events/app.rs
index 043a29b..134e86a 100644
--- a/src/events/app.rs
+++ b/src/events/app.rs
@@ -11,7 +11,7 @@ use sqlx::sqlite::SqlitePool;
use super::{
broadcaster::Broadcaster,
repo::message::Provider as _,
- types::{self, ResumePoint},
+ types::{self, ChannelEvent, ResumePoint},
};
use crate::{
clock::DateTime,
@@ -66,6 +66,17 @@ impl<'a> Events<'a> {
let mut tx = self.db.begin().await?;
let channels = tx.channels().all().await?;
+ let created_events = {
+ let resume_at = resume_at.clone();
+ let channels = channels.clone();
+ stream::iter(
+ channels
+ .into_iter()
+ .map(ChannelEvent::created)
+ .filter(move |event| resume_at.not_after(event)),
+ )
+ };
+
// Subscribe before retrieving, to catch messages broadcast while we're
// querying the DB. We'll prune out duplicates later.
let live_messages = self.broadcaster.subscribe();
@@ -104,9 +115,9 @@ impl<'a> Events<'a> {
// stored_messages.
.filter(Self::resume(resume_live_at));
- Ok(replay
- .chain(live_messages)
- .scan(resume_at, |resume_point, event| {
+ Ok(created_events.chain(replay).chain(live_messages).scan(
+ resume_at,
+ |resume_point, event| {
let channel = &event.channel.id;
let sequence = event.sequence;
resume_point.advance(channel, sequence);
@@ -114,13 +125,14 @@ impl<'a> Events<'a> {
let event = types::ResumableEvent(resume_point.clone(), event);
future::ready(Some(event))
- }))
+ },
+ ))
}
fn resume(
resume_at: ResumePoint,
) -> impl for<'m> FnMut(&'m types::ChannelEvent) -> future::Ready<bool> {
- move |event| future::ready(resume_at < event.sequence())
+ move |event| future::ready(resume_at.not_after(event))
}
fn skip_expired(
expire_at: &DateTime,
diff --git a/src/events/routes/test.rs b/src/events/routes/test.rs
index f289225..55ada95 100644
--- a/src/events/routes/test.rs
+++ b/src/events/routes/test.rs
@@ -15,7 +15,7 @@ async fn includes_historical_message() {
let app = fixtures::scratch_app().await;
let sender = fixtures::login::create(&app).await;
- let channel = fixtures::channel::create(&app).await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await;
// Call the endpoint
@@ -42,7 +42,7 @@ async fn includes_live_message() {
// Set up the environment
let app = fixtures::scratch_app().await;
- let channel = fixtures::channel::create(&app).await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
// Call the endpoint
@@ -75,8 +75,8 @@ async fn includes_multiple_channels() {
let sender = fixtures::login::create(&app).await;
let channels = [
- fixtures::channel::create(&app).await,
- fixtures::channel::create(&app).await,
+ fixtures::channel::create(&app, &fixtures::now()).await,
+ fixtures::channel::create(&app, &fixtures::now()).await,
];
let messages = stream::iter(channels)
@@ -117,7 +117,7 @@ async fn sequential_messages() {
// Set up the environment
let app = fixtures::scratch_app().await;
- let channel = fixtures::channel::create(&app).await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
let sender = fixtures::login::create(&app).await;
let messages = vec![
@@ -156,7 +156,7 @@ async fn resumes_from() {
// Set up the environment
let app = fixtures::scratch_app().await;
- let channel = fixtures::channel::create(&app).await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
let sender = fixtures::login::create(&app).await;
let initial_message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await;
@@ -229,8 +229,8 @@ async fn serial_resume() {
let app = fixtures::scratch_app().await;
let sender = fixtures::login::create(&app).await;
- let channel_a = fixtures::channel::create(&app).await;
- let channel_b = fixtures::channel::create(&app).await;
+ let channel_a = fixtures::channel::create(&app, &fixtures::now()).await;
+ let channel_b = fixtures::channel::create(&app, &fixtures::now()).await;
// Call the endpoint
@@ -346,7 +346,7 @@ async fn removes_expired_messages() {
// Set up the environment
let app = fixtures::scratch_app().await;
let sender = fixtures::login::create(&app).await;
- let channel = fixtures::channel::create(&app).await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
fixtures::message::send(&app, &sender, &channel, &fixtures::ancient()).await;
let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await;
diff --git a/src/events/types.rs b/src/events/types.rs
index 6747afc..7c0e0a4 100644
--- a/src/events/types.rs
+++ b/src/events/types.rs
@@ -11,6 +11,7 @@ use crate::{
#[derive(
Debug,
+ Default,
Eq,
Ord,
PartialEq,
@@ -59,7 +60,30 @@ impl ResumePoint {
let Self(elements) = self;
elements.get(channel).copied()
}
+
+ pub fn not_after(&self, event: impl ResumeElement) -> bool {
+ let Self(elements) = self;
+ let (channel, sequence) = event.element();
+
+ elements
+ .get(channel)
+ .map_or(true, |resume_at| resume_at < &sequence)
+ }
}
+
+pub trait ResumeElement {
+ fn element(&self) -> (&channel::Id, Sequence);
+}
+
+impl<T> ResumeElement for &T
+where
+ T: ResumeElement,
+{
+ fn element(&self) -> (&channel::Id, Sequence) {
+ (*self).element()
+ }
+}
+
#[derive(Clone, Debug)]
pub struct ResumableEvent(pub ResumePoint, pub ChannelEvent);
@@ -74,14 +98,26 @@ pub struct ChannelEvent {
}
impl ChannelEvent {
- pub fn sequence(&self) -> ResumePoint {
- ResumePoint::singleton(&self.channel.id, self.sequence)
+ pub fn created(channel: Channel) -> Self {
+ Self {
+ at: channel.created_at,
+ sequence: Sequence::default(),
+ channel,
+ data: ChannelEventData::Created,
+ }
+ }
+}
+
+impl ResumeElement for ChannelEvent {
+ fn element(&self) -> (&channel::Id, Sequence) {
+ (&self.channel.id, self.sequence)
}
}
#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum ChannelEventData {
+ Created,
Message(MessageEvent),
}