summaryrefslogtreecommitdiff
path: root/src/boot/app.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/boot/app.rs')
-rw-r--r--src/boot/app.rs54
1 files changed, 54 insertions, 0 deletions
diff --git a/src/boot/app.rs b/src/boot/app.rs
new file mode 100644
index 0000000..fc84b3a
--- /dev/null
+++ b/src/boot/app.rs
@@ -0,0 +1,54 @@
+use sqlx::sqlite::SqlitePool;
+
+use super::{Channel, Snapshot};
+use crate::{
+ channel::repo::Provider as _, event::repo::Provider as _, message::repo::Provider as _,
+};
+
+pub struct Boot<'a> {
+ db: &'a SqlitePool,
+}
+
+impl<'a> Boot<'a> {
+ pub const fn new(db: &'a SqlitePool) -> Self {
+ Self { db }
+ }
+
+ pub async fn snapshot(&self) -> Result<Snapshot, sqlx::Error> {
+ let mut tx = self.db.begin().await?;
+ let resume_point = tx.sequence().current().await?;
+ let channels = tx.channels().all(resume_point.into()).await?;
+
+ let channels = {
+ let mut snapshots = Vec::with_capacity(channels.len());
+
+ let channels = channels.into_iter().filter_map(|channel| {
+ channel
+ .as_of(resume_point)
+ .map(|snapshot| (channel, snapshot))
+ });
+
+ for (channel, snapshot) in channels {
+ let messages = tx
+ .messages()
+ .in_channel(&channel, resume_point.into())
+ .await?;
+
+ let messages = messages
+ .into_iter()
+ .filter_map(|message| message.as_of(resume_point));
+
+ snapshots.push(Channel::new(snapshot, messages));
+ }
+
+ snapshots
+ };
+
+ tx.commit().await?;
+
+ Ok(Snapshot {
+ resume_point,
+ channels,
+ })
+ }
+}