diff options
| author | Owen Jacobson <owen@grimoire.ca> | 2024-09-20 16:09:35 -0400 |
|---|---|---|
| committer | Owen Jacobson <owen@grimoire.ca> | 2024-09-20 16:42:25 -0400 |
| commit | aafdeb9ffaf9a993ca4462b3422667e04469b2e3 (patch) | |
| tree | ef2c0b8a8719a3ad511c80b38b3669d9f0c49157 /src/channel | |
| parent | 8fe54f09aad3121d1cb9418087e46dc3a617463a (diff) | |
Expire messages after 90 days.
This is intended to manage storage growth. A community with broadly steady traffic will now reach a steady state (ish) where the amount of storage in use stays within a steady band.
The 90 day threshold is a spitball; this should be made configurable for the community's needs.
I've also hoisted expiry out into the `app` classes, to reduce the amount of non-database work repo types are doing. This should make it easier to make expiry configurable later on.
Includes incidental cleanup and style changes.
Diffstat (limited to 'src/channel')
| -rw-r--r-- | src/channel/app.rs | 41 | ||||
| -rw-r--r-- | src/channel/repo/broadcast.rs | 14 |
2 files changed, 41 insertions, 14 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs index 5aabe31..2f37878 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -1,6 +1,7 @@ use std::collections::{hash_map::Entry, HashMap}; use std::sync::{Arc, Mutex, MutexGuard}; +use chrono::TimeDelta; use futures::{ future, stream::{self, StreamExt as _}, @@ -73,19 +74,12 @@ impl<'a> Channels<'a> { pub async fn events( &self, channel: &channel::Id, + subscribed_at: &DateTime, resume_at: Option<&DateTime>, ) -> Result<impl Stream<Item = broadcast::Message>, EventsError> { - fn skip_stale( - resume_at: Option<&DateTime>, - ) -> impl for<'m> FnMut(&'m broadcast::Message) -> future::Ready<bool> { - let resume_at = resume_at.cloned(); - move |msg| { - future::ready(match resume_at { - None => false, - Some(resume_at) => msg.sent_at <= resume_at, - }) - } - } + // Somewhat arbitrarily, expire after 90 days. + let expire_at = subscribed_at.to_owned() - TimeDelta::days(90); + let mut tx = self .db .begin() @@ -96,8 +90,10 @@ impl<'a> Channels<'a> { let live_messages = self .broadcaster .listen(&channel.id) - .skip_while(skip_stale(resume_at)); + .filter(Self::skip_stale(resume_at)) + .filter(Self::skip_expired(&expire_at)); + tx.broadcast().expire(&expire_at).await?; let stored_messages = tx.broadcast().replay(&channel, resume_at).await?; tx.commit().await?; @@ -105,6 +101,24 @@ impl<'a> Channels<'a> { Ok(stored_messages.chain(live_messages)) } + + fn skip_stale( + resume_at: Option<&DateTime>, + ) -> impl for<'m> FnMut(&'m broadcast::Message) -> future::Ready<bool> { + let resume_at = resume_at.cloned(); + move |msg| { + future::ready(match resume_at { + None => true, + Some(resume_at) => msg.sent_at > resume_at, + }) + } + } + fn skip_expired( + expire_at: &DateTime, + ) -> impl for<'m> FnMut(&'m broadcast::Message) -> future::Ready<bool> { + let expire_at = expire_at.to_owned(); + move |msg| future::ready(msg.sent_at > expire_at) + } } #[derive(Debug, thiserror::Error)] @@ -200,8 +214,7 @@ impl Broadcaster { // should always hold. // // See also <https://users.rust-lang.org/t/taking-from-stream-while-ok/48854>. - debug_assert!(r.is_ok()); - r.unwrap() + r.expect("after filtering, only `Ok` messages should remain") }) } diff --git a/src/channel/repo/broadcast.rs b/src/channel/repo/broadcast.rs index ff16cd0..182203a 100644 --- a/src/channel/repo/broadcast.rs +++ b/src/channel/repo/broadcast.rs @@ -68,6 +68,20 @@ impl<'c> Broadcast<'c> { Ok(message) } + pub async fn expire(&mut self, expire_at: &DateTime) -> Result<(), sqlx::Error> { + sqlx::query!( + r#" + delete from message + where sent_at < $1 + "#, + expire_at, + ) + .execute(&mut *self.0) + .await?; + + Ok(()) + } + pub async fn replay( &mut self, channel: &Channel, |
