summaryrefslogtreecommitdiff
path: root/src/channel/app.rs
diff options
context:
space:
mode:
authorOwen Jacobson <owen@grimoire.ca>2024-10-02 12:25:36 -0400
committerOwen Jacobson <owen@grimoire.ca>2024-10-03 19:25:41 -0400
commitec804134c33aedb001c426c5f42f43f53c47848f (patch)
treec62b59ab5cdd438f47a5f9cc35fdc712d362af19 /src/channel/app.rs
parent469613872f6fb19f4579b387e19b2bc38fa52f51 (diff)
Represent channels and messages using a split "History" and "Snapshot" model.
This separates the code that figures out what happened to an entity from the code that represents it to a user, and makes it easier to compute a snapshot at a point in time (for things like bootstrap). It also makes the internal logic a bit easier to follow, since it's easier to tell whether you're working with a point in time or with the whole recorded history. This hefty.
Diffstat (limited to 'src/channel/app.rs')
-rw-r--r--src/channel/app.rs35
1 files changed, 27 insertions, 8 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs
index b7e3a10..6ce826b 100644
--- a/src/channel/app.rs
+++ b/src/channel/app.rs
@@ -1,10 +1,11 @@
use chrono::TimeDelta;
+use itertools::Itertools;
use sqlx::sqlite::SqlitePool;
use crate::{
channel::{repo::Provider as _, Channel},
clock::DateTime,
- event::{broadcaster::Broadcaster, repo::Provider as _, types::ChannelEvent, Sequence},
+ event::{broadcaster::Broadcaster, repo::Provider as _, Sequence},
};
pub struct Channels<'a> {
@@ -27,10 +28,11 @@ impl<'a> Channels<'a> {
.map_err(|err| CreateError::from_duplicate_name(err, name))?;
tx.commit().await?;
- self.events
- .broadcast(&ChannelEvent::created(channel.clone()));
+ for event in channel.events() {
+ self.events.broadcast(event);
+ }
- Ok(channel)
+ Ok(channel.snapshot())
}
pub async fn all(&self, resume_point: Option<Sequence>) -> Result<Vec<Channel>, InternalError> {
@@ -38,6 +40,16 @@ impl<'a> Channels<'a> {
let channels = tx.channels().all(resume_point).await?;
tx.commit().await?;
+ let channels = channels
+ .into_iter()
+ .filter_map(|channel| {
+ channel
+ .events()
+ .filter(Sequence::up_to(resume_point))
+ .collect()
+ })
+ .collect();
+
Ok(channels)
}
@@ -51,14 +63,21 @@ impl<'a> Channels<'a> {
let mut events = Vec::with_capacity(expired.len());
for channel in expired {
let deleted = tx.sequence().next(relative_to).await?;
- let event = tx.channels().delete(&channel, &deleted).await?;
- events.push(event);
+ let channel = tx.channels().delete(&channel, &deleted).await?;
+ events.push(
+ channel
+ .events()
+ .filter(Sequence::start_from(deleted.sequence)),
+ );
}
tx.commit().await?;
- for event in events {
- self.events.broadcast(&event);
+ for event in events
+ .into_iter()
+ .kmerge_by(|a, b| a.instant.sequence < b.instant.sequence)
+ {
+ self.events.broadcast(event);
}
Ok(())