summaryrefslogtreecommitdiff
path: root/src/events/app.rs
diff options
context:
space:
mode:
authorOwen Jacobson <owen@grimoire.ca>2024-09-27 23:03:46 -0400
committerOwen Jacobson <owen@grimoire.ca>2024-09-28 01:00:12 -0400
commit60b711c844f8624348d5d1dac3a625532a8e2a82 (patch)
treea667cfa3833046425a87ec03c700d6124af70e4e /src/events/app.rs
parent08c3a6e77a3f61ffc9643a5e1f840df9078d0b36 (diff)
Delete expired messages out of band.
Trying to reliably do expiry mid-request was causing some anomalies: * Creating a channel with a dup name would fail, then succeed after listing channels. It was very hard to reason about which operations needed to trigger expiry, to fix this "correctly," so now expiry runs on every request.
Diffstat (limited to 'src/events/app.rs')
-rw-r--r--src/events/app.rs40
1 files changed, 25 insertions, 15 deletions
diff --git a/src/events/app.rs b/src/events/app.rs
index 134e86a..03f3ee6 100644
--- a/src/events/app.rs
+++ b/src/events/app.rs
@@ -55,14 +55,35 @@ impl<'a> Events<'a> {
Ok(event)
}
+ pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> {
+ // Somewhat arbitrarily, expire after 90 days.
+ let expire_at = relative_to.to_owned() - TimeDelta::days(90);
+
+ let mut tx = self.db.begin().await?;
+ let expired = tx.message_events().expired(&expire_at).await?;
+
+ let mut events = Vec::with_capacity(expired.len());
+ for (channel, message) in expired {
+ let event = tx
+ .message_events()
+ .delete_expired(&channel, &message, relative_to)
+ .await?;
+ events.push(event);
+ }
+
+ tx.commit().await?;
+
+ for event in events {
+ self.broadcaster.broadcast(&event);
+ }
+
+ Ok(())
+ }
+
pub async fn subscribe(
&self,
- subscribed_at: &DateTime,
resume_at: ResumePoint,
) -> Result<impl Stream<Item = types::ResumableEvent> + std::fmt::Debug, sqlx::Error> {
- // Somewhat arbitrarily, expire after 90 days.
- let expire_at = subscribed_at.to_owned() - TimeDelta::days(90);
-
let mut tx = self.db.begin().await?;
let channels = tx.channels().all().await?;
@@ -81,8 +102,6 @@ impl<'a> Events<'a> {
// querying the DB. We'll prune out duplicates later.
let live_messages = self.broadcaster.subscribe();
- tx.message_events().expire(&expire_at).await?;
-
let mut replays = BTreeMap::new();
let mut resume_live_at = resume_at.clone();
for channel in channels {
@@ -107,9 +126,6 @@ impl<'a> Events<'a> {
// * resume is redundant with the resume_at argument to
// `tx.broadcasts().replay(…)`.
let live_messages = live_messages
- // Sure, it's temporally improbable that we'll ever skip a message
- // that's 90 days old, but there's no reason not to be thorough.
- .filter(Self::skip_expired(&expire_at))
// Filtering on the broadcast resume point filters out messages
// before resume_at, and filters out messages duplicated from
// stored_messages.
@@ -134,12 +150,6 @@ impl<'a> Events<'a> {
) -> impl for<'m> FnMut(&'m types::ChannelEvent) -> future::Ready<bool> {
move |event| future::ready(resume_at.not_after(event))
}
- fn skip_expired(
- expire_at: &DateTime,
- ) -> impl for<'m> FnMut(&'m types::ChannelEvent) -> future::Ready<bool> {
- let expire_at = expire_at.to_owned();
- move |event| future::ready(expire_at < event.at)
- }
}
#[derive(Debug, thiserror::Error)]