diff options
Diffstat (limited to 'src/channel/app.rs')
| -rw-r--r-- | src/channel/app.rs | 41 |
1 files changed, 27 insertions, 14 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs index 5aabe31..2f37878 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -1,6 +1,7 @@ use std::collections::{hash_map::Entry, HashMap}; use std::sync::{Arc, Mutex, MutexGuard}; +use chrono::TimeDelta; use futures::{ future, stream::{self, StreamExt as _}, @@ -73,19 +74,12 @@ impl<'a> Channels<'a> { pub async fn events( &self, channel: &channel::Id, + subscribed_at: &DateTime, resume_at: Option<&DateTime>, ) -> Result<impl Stream<Item = broadcast::Message>, EventsError> { - fn skip_stale( - resume_at: Option<&DateTime>, - ) -> impl for<'m> FnMut(&'m broadcast::Message) -> future::Ready<bool> { - let resume_at = resume_at.cloned(); - move |msg| { - future::ready(match resume_at { - None => false, - Some(resume_at) => msg.sent_at <= resume_at, - }) - } - } + // Somewhat arbitrarily, expire after 90 days. + let expire_at = subscribed_at.to_owned() - TimeDelta::days(90); + let mut tx = self .db .begin() @@ -96,8 +90,10 @@ impl<'a> Channels<'a> { let live_messages = self .broadcaster .listen(&channel.id) - .skip_while(skip_stale(resume_at)); + .filter(Self::skip_stale(resume_at)) + .filter(Self::skip_expired(&expire_at)); + tx.broadcast().expire(&expire_at).await?; let stored_messages = tx.broadcast().replay(&channel, resume_at).await?; tx.commit().await?; @@ -105,6 +101,24 @@ impl<'a> Channels<'a> { Ok(stored_messages.chain(live_messages)) } + + fn skip_stale( + resume_at: Option<&DateTime>, + ) -> impl for<'m> FnMut(&'m broadcast::Message) -> future::Ready<bool> { + let resume_at = resume_at.cloned(); + move |msg| { + future::ready(match resume_at { + None => true, + Some(resume_at) => msg.sent_at > resume_at, + }) + } + } + fn skip_expired( + expire_at: &DateTime, + ) -> impl for<'m> FnMut(&'m broadcast::Message) -> future::Ready<bool> { + let expire_at = expire_at.to_owned(); + move |msg| future::ready(msg.sent_at > expire_at) + } } #[derive(Debug, thiserror::Error)] @@ -200,8 +214,7 @@ impl Broadcaster { // should always hold. // // See also <https://users.rust-lang.org/t/taking-from-stream-while-ok/48854>. - debug_assert!(r.is_ok()); - r.unwrap() + r.expect("after filtering, only `Ok` messages should remain") }) } |
