summaryrefslogtreecommitdiff
path: root/src/channel/app.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/channel/app.rs')
-rw-r--r--src/channel/app.rs35
1 files changed, 27 insertions, 8 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs
index b7e3a10..6ce826b 100644
--- a/src/channel/app.rs
+++ b/src/channel/app.rs
@@ -1,10 +1,11 @@
use chrono::TimeDelta;
+use itertools::Itertools;
use sqlx::sqlite::SqlitePool;
use crate::{
channel::{repo::Provider as _, Channel},
clock::DateTime,
- event::{broadcaster::Broadcaster, repo::Provider as _, types::ChannelEvent, Sequence},
+ event::{broadcaster::Broadcaster, repo::Provider as _, Sequence},
};
pub struct Channels<'a> {
@@ -27,10 +28,11 @@ impl<'a> Channels<'a> {
.map_err(|err| CreateError::from_duplicate_name(err, name))?;
tx.commit().await?;
- self.events
- .broadcast(&ChannelEvent::created(channel.clone()));
+ for event in channel.events() {
+ self.events.broadcast(event);
+ }
- Ok(channel)
+ Ok(channel.snapshot())
}
pub async fn all(&self, resume_point: Option<Sequence>) -> Result<Vec<Channel>, InternalError> {
@@ -38,6 +40,16 @@ impl<'a> Channels<'a> {
let channels = tx.channels().all(resume_point).await?;
tx.commit().await?;
+ let channels = channels
+ .into_iter()
+ .filter_map(|channel| {
+ channel
+ .events()
+ .filter(Sequence::up_to(resume_point))
+ .collect()
+ })
+ .collect();
+
Ok(channels)
}
@@ -51,14 +63,21 @@ impl<'a> Channels<'a> {
let mut events = Vec::with_capacity(expired.len());
for channel in expired {
let deleted = tx.sequence().next(relative_to).await?;
- let event = tx.channels().delete(&channel, &deleted).await?;
- events.push(event);
+ let channel = tx.channels().delete(&channel, &deleted).await?;
+ events.push(
+ channel
+ .events()
+ .filter(Sequence::start_from(deleted.sequence)),
+ );
}
tx.commit().await?;
- for event in events {
- self.events.broadcast(&event);
+ for event in events
+ .into_iter()
+ .kmerge_by(|a, b| a.instant.sequence < b.instant.sequence)
+ {
+ self.events.broadcast(event);
}
Ok(())