summaryrefslogtreecommitdiff
path: root/src/events/app.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/events/app.rs')
-rw-r--r--src/events/app.rs40
1 files changed, 25 insertions, 15 deletions
diff --git a/src/events/app.rs b/src/events/app.rs
index 134e86a..03f3ee6 100644
--- a/src/events/app.rs
+++ b/src/events/app.rs
@@ -55,14 +55,35 @@ impl<'a> Events<'a> {
Ok(event)
}
+ pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> {
+ // Somewhat arbitrarily, expire after 90 days.
+ let expire_at = relative_to.to_owned() - TimeDelta::days(90);
+
+ let mut tx = self.db.begin().await?;
+ let expired = tx.message_events().expired(&expire_at).await?;
+
+ let mut events = Vec::with_capacity(expired.len());
+ for (channel, message) in expired {
+ let event = tx
+ .message_events()
+ .delete_expired(&channel, &message, relative_to)
+ .await?;
+ events.push(event);
+ }
+
+ tx.commit().await?;
+
+ for event in events {
+ self.broadcaster.broadcast(&event);
+ }
+
+ Ok(())
+ }
+
pub async fn subscribe(
&self,
- subscribed_at: &DateTime,
resume_at: ResumePoint,
) -> Result<impl Stream<Item = types::ResumableEvent> + std::fmt::Debug, sqlx::Error> {
- // Somewhat arbitrarily, expire after 90 days.
- let expire_at = subscribed_at.to_owned() - TimeDelta::days(90);
-
let mut tx = self.db.begin().await?;
let channels = tx.channels().all().await?;
@@ -81,8 +102,6 @@ impl<'a> Events<'a> {
// querying the DB. We'll prune out duplicates later.
let live_messages = self.broadcaster.subscribe();
- tx.message_events().expire(&expire_at).await?;
-
let mut replays = BTreeMap::new();
let mut resume_live_at = resume_at.clone();
for channel in channels {
@@ -107,9 +126,6 @@ impl<'a> Events<'a> {
// * resume is redundant with the resume_at argument to
// `tx.broadcasts().replay(…)`.
let live_messages = live_messages
- // Sure, it's temporally improbable that we'll ever skip a message
- // that's 90 days old, but there's no reason not to be thorough.
- .filter(Self::skip_expired(&expire_at))
// Filtering on the broadcast resume point filters out messages
// before resume_at, and filters out messages duplicated from
// stored_messages.
@@ -134,12 +150,6 @@ impl<'a> Events<'a> {
) -> impl for<'m> FnMut(&'m types::ChannelEvent) -> future::Ready<bool> {
move |event| future::ready(resume_at.not_after(event))
}
- fn skip_expired(
- expire_at: &DateTime,
- ) -> impl for<'m> FnMut(&'m types::ChannelEvent) -> future::Ready<bool> {
- let expire_at = expire_at.to_owned();
- move |event| future::ready(expire_at < event.at)
- }
}
#[derive(Debug, thiserror::Error)]