summaryrefslogtreecommitdiff
path: root/src/event/app.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/event/app.rs')
-rw-r--r--src/event/app.rs12
1 files changed, 11 insertions, 1 deletions
diff --git a/src/event/app.rs b/src/event/app.rs
index 141037d..951ce25 100644
--- a/src/event/app.rs
+++ b/src/event/app.rs
@@ -9,6 +9,7 @@ use sqlx::sqlite::SqlitePool;
use super::{broadcaster::Broadcaster, Event, ResumePoint, Sequence, Sequenced};
use crate::{
channel::{self, repo::Provider as _},
+ login::{self, repo::Provider as _},
message::{self, repo::Provider as _},
};
@@ -33,6 +34,14 @@ impl<'a> Events<'a> {
let mut tx = self.db.begin().await?;
+ let logins = tx.logins().replay(resume_at).await?;
+ let login_events = logins
+ .iter()
+ .map(login::History::events)
+ .kmerge_by(Sequence::merge)
+ .filter(Sequence::after(resume_at))
+ .map(Event::from);
+
let channels = tx.channels().replay(resume_at).await?;
let channel_events = channels
.iter()
@@ -49,7 +58,8 @@ impl<'a> Events<'a> {
.filter(Sequence::after(resume_at))
.map(Event::from);
- let replay_events = channel_events
+ let replay_events = login_events
+ .merge_by(channel_events, Sequence::merge)
.merge_by(message_events, Sequence::merge)
.collect::<Vec<_>>();
let resume_live_at = replay_events.last().map(Sequenced::sequence);