summaryrefslogtreecommitdiff
path: root/src/boot/app.rs
diff options
context:
space:
mode:
authorOwen Jacobson <owen@grimoire.ca>2025-06-18 23:33:02 -0400
committerOwen Jacobson <owen@grimoire.ca>2025-06-20 22:27:35 -0400
commit4b522c804db8155f74a734c95ed962d996b2c692 (patch)
treef5369ba821ba22cdd307b7a3a411e95ad1a1e896 /src/boot/app.rs
parent057bbef5f37a4051615ad23661a0b4853b61162e (diff)
Include historical events in the boot response.
The returned events are all events up to and including the `resume_point` in the same response. If combined with the events from `/api/events?resume_point=x`, using the same `resume_point`, the client will have a complete event history, less any events from histories that have been purged.
Diffstat (limited to 'src/boot/app.rs')
-rw-r--r--src/boot/app.rs50
1 files changed, 39 insertions, 11 deletions
diff --git a/src/boot/app.rs b/src/boot/app.rs
index f531afe..690bcf4 100644
--- a/src/boot/app.rs
+++ b/src/boot/app.rs
@@ -1,10 +1,11 @@
+use itertools::Itertools as _;
use sqlx::sqlite::SqlitePool;
use super::Snapshot;
use crate::{
channel::{self, repo::Provider as _},
- event::repo::Provider as _,
- message::repo::Provider as _,
+ event::{Event, Sequence, repo::Provider as _},
+ message::{self, repo::Provider as _},
name,
user::{self, repo::Provider as _},
};
@@ -22,32 +23,59 @@ impl<'a> Boot<'a> {
let mut tx = self.db.begin().await?;
let resume_point = tx.sequence().current().await?;
- let users = tx.users().all(resume_point).await?;
- let channels = tx.channels().all(resume_point).await?;
- let messages = tx.messages().all(resume_point).await?;
+ let user_histories = tx.users().all(resume_point).await?;
+ let channel_histories = tx.channels().all(resume_point).await?;
+ let message_histories = tx.messages().all(resume_point).await?;
tx.commit().await?;
- let users = users
- .into_iter()
+ let users = user_histories
+ .iter()
.filter_map(|user| user.as_of(resume_point))
.collect();
- let channels = channels
- .into_iter()
+ let channels = channel_histories
+ .iter()
.filter_map(|channel| channel.as_of(resume_point))
.collect();
- let messages = messages
- .into_iter()
+ let messages = message_histories
+ .iter()
.filter_map(|message| message.as_of(resume_point))
.collect();
+ let user_events = user_histories
+ .iter()
+ .map(user::History::events)
+ .kmerge_by(Sequence::merge)
+ .filter(Sequence::up_to(resume_point))
+ .map(Event::from);
+
+ let channel_events = channel_histories
+ .iter()
+ .map(channel::History::events)
+ .kmerge_by(Sequence::merge)
+ .filter(Sequence::up_to(resume_point))
+ .map(Event::from);
+
+ let message_events = message_histories
+ .iter()
+ .map(message::History::events)
+ .kmerge_by(Sequence::merge)
+ .filter(Sequence::up_to(resume_point))
+ .map(Event::from);
+
+ let events = user_events
+ .merge_by(channel_events, Sequence::merge)
+ .merge_by(message_events, Sequence::merge)
+ .collect();
+
Ok(Snapshot {
resume_point,
users,
channels,
messages,
+ events,
})
}
}