From aafdeb9ffaf9a993ca4462b3422667e04469b2e3 Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Fri, 20 Sep 2024 16:09:35 -0400 Subject: Expire messages after 90 days. This is intended to manage storage growth. A community with broadly steady traffic will now reach a steady state (ish) where the amount of storage in use stays within a steady band. The 90 day threshold is a spitball; this should be made configurable for the community's needs. I've also hoisted expiry out into the `app` classes, to reduce the amount of non-database work repo types are doing. This should make it easier to make expiry configurable later on. Includes incidental cleanup and style changes. --- src/channel/app.rs | 41 +++++++++++++++++++++++++++-------------- 1 file changed, 27 insertions(+), 14 deletions(-) (limited to 'src/channel/app.rs') diff --git a/src/channel/app.rs b/src/channel/app.rs index 5aabe31..2f37878 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -1,6 +1,7 @@ use std::collections::{hash_map::Entry, HashMap}; use std::sync::{Arc, Mutex, MutexGuard}; +use chrono::TimeDelta; use futures::{ future, stream::{self, StreamExt as _}, @@ -73,19 +74,12 @@ impl<'a> Channels<'a> { pub async fn events( &self, channel: &channel::Id, + subscribed_at: &DateTime, resume_at: Option<&DateTime>, ) -> Result, EventsError> { - fn skip_stale( - resume_at: Option<&DateTime>, - ) -> impl for<'m> FnMut(&'m broadcast::Message) -> future::Ready { - let resume_at = resume_at.cloned(); - move |msg| { - future::ready(match resume_at { - None => false, - Some(resume_at) => msg.sent_at <= resume_at, - }) - } - } + // Somewhat arbitrarily, expire after 90 days. + let expire_at = subscribed_at.to_owned() - TimeDelta::days(90); + let mut tx = self .db .begin() @@ -96,8 +90,10 @@ impl<'a> Channels<'a> { let live_messages = self .broadcaster .listen(&channel.id) - .skip_while(skip_stale(resume_at)); + .filter(Self::skip_stale(resume_at)) + .filter(Self::skip_expired(&expire_at)); + tx.broadcast().expire(&expire_at).await?; let stored_messages = tx.broadcast().replay(&channel, resume_at).await?; tx.commit().await?; @@ -105,6 +101,24 @@ impl<'a> Channels<'a> { Ok(stored_messages.chain(live_messages)) } + + fn skip_stale( + resume_at: Option<&DateTime>, + ) -> impl for<'m> FnMut(&'m broadcast::Message) -> future::Ready { + let resume_at = resume_at.cloned(); + move |msg| { + future::ready(match resume_at { + None => true, + Some(resume_at) => msg.sent_at > resume_at, + }) + } + } + fn skip_expired( + expire_at: &DateTime, + ) -> impl for<'m> FnMut(&'m broadcast::Message) -> future::Ready { + let expire_at = expire_at.to_owned(); + move |msg| future::ready(msg.sent_at > expire_at) + } } #[derive(Debug, thiserror::Error)] @@ -200,8 +214,7 @@ impl Broadcaster { // should always hold. // // See also . - debug_assert!(r.is_ok()); - r.unwrap() + r.expect("after filtering, only `Ok` messages should remain") }) } -- cgit v1.2.3