summaryrefslogtreecommitdiff
path: root/src/channel/app.rs
diff options
context:
space:
mode:
authorOwen Jacobson <owen@grimoire.ca>2024-09-20 16:09:35 -0400
committerOwen Jacobson <owen@grimoire.ca>2024-09-20 16:42:25 -0400
commitaafdeb9ffaf9a993ca4462b3422667e04469b2e3 (patch)
treeef2c0b8a8719a3ad511c80b38b3669d9f0c49157 /src/channel/app.rs
parent8fe54f09aad3121d1cb9418087e46dc3a617463a (diff)
Expire messages after 90 days.
This is intended to manage storage growth. A community with broadly steady traffic will now reach a steady state (ish) where the amount of storage in use stays within a steady band. The 90 day threshold is a spitball; this should be made configurable for the community's needs. I've also hoisted expiry out into the `app` classes, to reduce the amount of non-database work repo types are doing. This should make it easier to make expiry configurable later on. Includes incidental cleanup and style changes.
Diffstat (limited to 'src/channel/app.rs')
-rw-r--r--src/channel/app.rs41
1 files changed, 27 insertions, 14 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs
index 5aabe31..2f37878 100644
--- a/src/channel/app.rs
+++ b/src/channel/app.rs
@@ -1,6 +1,7 @@
use std::collections::{hash_map::Entry, HashMap};
use std::sync::{Arc, Mutex, MutexGuard};
+use chrono::TimeDelta;
use futures::{
future,
stream::{self, StreamExt as _},
@@ -73,19 +74,12 @@ impl<'a> Channels<'a> {
pub async fn events(
&self,
channel: &channel::Id,
+ subscribed_at: &DateTime,
resume_at: Option<&DateTime>,
) -> Result<impl Stream<Item = broadcast::Message>, EventsError> {
- fn skip_stale(
- resume_at: Option<&DateTime>,
- ) -> impl for<'m> FnMut(&'m broadcast::Message) -> future::Ready<bool> {
- let resume_at = resume_at.cloned();
- move |msg| {
- future::ready(match resume_at {
- None => false,
- Some(resume_at) => msg.sent_at <= resume_at,
- })
- }
- }
+ // Somewhat arbitrarily, expire after 90 days.
+ let expire_at = subscribed_at.to_owned() - TimeDelta::days(90);
+
let mut tx = self
.db
.begin()
@@ -96,8 +90,10 @@ impl<'a> Channels<'a> {
let live_messages = self
.broadcaster
.listen(&channel.id)
- .skip_while(skip_stale(resume_at));
+ .filter(Self::skip_stale(resume_at))
+ .filter(Self::skip_expired(&expire_at));
+ tx.broadcast().expire(&expire_at).await?;
let stored_messages = tx.broadcast().replay(&channel, resume_at).await?;
tx.commit().await?;
@@ -105,6 +101,24 @@ impl<'a> Channels<'a> {
Ok(stored_messages.chain(live_messages))
}
+
+ fn skip_stale(
+ resume_at: Option<&DateTime>,
+ ) -> impl for<'m> FnMut(&'m broadcast::Message) -> future::Ready<bool> {
+ let resume_at = resume_at.cloned();
+ move |msg| {
+ future::ready(match resume_at {
+ None => true,
+ Some(resume_at) => msg.sent_at > resume_at,
+ })
+ }
+ }
+ fn skip_expired(
+ expire_at: &DateTime,
+ ) -> impl for<'m> FnMut(&'m broadcast::Message) -> future::Ready<bool> {
+ let expire_at = expire_at.to_owned();
+ move |msg| future::ready(msg.sent_at > expire_at)
+ }
}
#[derive(Debug, thiserror::Error)]
@@ -200,8 +214,7 @@ impl Broadcaster {
// should always hold.
//
// See also <https://users.rust-lang.org/t/taking-from-stream-while-ok/48854>.
- debug_assert!(r.is_ok());
- r.unwrap()
+ r.expect("after filtering, only `Ok` messages should remain")
})
}