diff options
| author | Owen Jacobson <owen@grimoire.ca> | 2024-10-02 12:25:36 -0400 |
|---|---|---|
| committer | Owen Jacobson <owen@grimoire.ca> | 2024-10-03 19:25:41 -0400 |
| commit | ec804134c33aedb001c426c5f42f43f53c47848f (patch) | |
| tree | c62b59ab5cdd438f47a5f9cc35fdc712d362af19 /src/channel/app.rs | |
| parent | 469613872f6fb19f4579b387e19b2bc38fa52f51 (diff) | |
Represent channels and messages using a split "History" and "Snapshot" model.
This separates the code that figures out what happened to an entity from the code that represents it to a user, and makes it easier to compute a snapshot at a point in time (for things like bootstrap). It also makes the internal logic a bit easier to follow, since it's easier to tell whether you're working with a point in time or with the whole recorded history.
This hefty.
Diffstat (limited to 'src/channel/app.rs')
| -rw-r--r-- | src/channel/app.rs | 35 |
1 files changed, 27 insertions, 8 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs index b7e3a10..6ce826b 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -1,10 +1,11 @@ use chrono::TimeDelta; +use itertools::Itertools; use sqlx::sqlite::SqlitePool; use crate::{ channel::{repo::Provider as _, Channel}, clock::DateTime, - event::{broadcaster::Broadcaster, repo::Provider as _, types::ChannelEvent, Sequence}, + event::{broadcaster::Broadcaster, repo::Provider as _, Sequence}, }; pub struct Channels<'a> { @@ -27,10 +28,11 @@ impl<'a> Channels<'a> { .map_err(|err| CreateError::from_duplicate_name(err, name))?; tx.commit().await?; - self.events - .broadcast(&ChannelEvent::created(channel.clone())); + for event in channel.events() { + self.events.broadcast(event); + } - Ok(channel) + Ok(channel.snapshot()) } pub async fn all(&self, resume_point: Option<Sequence>) -> Result<Vec<Channel>, InternalError> { @@ -38,6 +40,16 @@ impl<'a> Channels<'a> { let channels = tx.channels().all(resume_point).await?; tx.commit().await?; + let channels = channels + .into_iter() + .filter_map(|channel| { + channel + .events() + .filter(Sequence::up_to(resume_point)) + .collect() + }) + .collect(); + Ok(channels) } @@ -51,14 +63,21 @@ impl<'a> Channels<'a> { let mut events = Vec::with_capacity(expired.len()); for channel in expired { let deleted = tx.sequence().next(relative_to).await?; - let event = tx.channels().delete(&channel, &deleted).await?; - events.push(event); + let channel = tx.channels().delete(&channel, &deleted).await?; + events.push( + channel + .events() + .filter(Sequence::start_from(deleted.sequence)), + ); } tx.commit().await?; - for event in events { - self.events.broadcast(&event); + for event in events + .into_iter() + .kmerge_by(|a, b| a.instant.sequence < b.instant.sequence) + { + self.events.broadcast(event); } Ok(()) |
