diff options
Diffstat (limited to 'src/channel/app.rs')
| -rw-r--r-- | src/channel/app.rs | 57 |
1 files changed, 41 insertions, 16 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs index 3c92d76..e314792 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -77,16 +77,11 @@ impl<'a> Channels<'a> { &self, channel: &channel::Id, subscribed_at: &DateTime, - resume_at: Option<&str>, + resume_at: Option<broadcast::Sequence>, ) -> Result<impl Stream<Item = broadcast::Message> + std::fmt::Debug, EventsError> { // Somewhat arbitrarily, expire after 90 days. let expire_at = subscribed_at.to_owned() - TimeDelta::days(90); - let resume_at = resume_at - .map(chrono::DateTime::parse_from_rfc3339) - .transpose()? - .map(|resume_at| resume_at.to_utc()); - let mut tx = self.db.begin().await?; let channel = tx .channels() @@ -94,29 +89,59 @@ impl<'a> Channels<'a> { .await .not_found(|| EventsError::ChannelNotFound(channel.clone()))?; - let live_messages = self - .broadcaster - .listen(&channel.id) - .filter(Self::skip_stale(resume_at.as_ref())) - .filter(Self::skip_expired(&expire_at)); + // Subscribe before retrieving, to catch messages broadcast while we're + // querying the DB. We'll prune out duplicates later. + let live_messages = self.broadcaster.listen(&channel.id); tx.broadcast().expire(&expire_at).await?; - let stored_messages = tx.broadcast().replay(&channel, resume_at.as_ref()).await?; + let stored_messages = tx.broadcast().replay(&channel, resume_at).await?; tx.commit().await?; + let resume_broadcast_at = stored_messages + .last() + .map(|message| message.sequence) + .or(resume_at); + + // This should always be the case, up to integer rollover, primarily + // because every message in stored_messages has a sequence not less + // than `resume_at`, or `resume_at` is None. We use the last message + // (if any) to decide when to resume the `live_messages` stream. + // + // It probably simplifies to assert!(resume_at <= resume_broadcast_at), but + // this form captures more of the reasoning. + assert!( + (resume_at.is_none() && resume_broadcast_at.is_none()) + || (stored_messages.is_empty() && resume_at == resume_broadcast_at) + || resume_at < resume_broadcast_at + ); + + // no skip_expired or resume transforms for stored_messages, as it's + // constructed not to contain messages meeting either criterion. + // + // * skip_expired is redundant with the `tx.broadcasts().expire(…)` call; + // * resume is redundant with the resume_at argument to + // `tx.broadcasts().replay(…)`. let stored_messages = stream::iter(stored_messages); + let live_messages = live_messages + // Sure, it's temporally improbable that we'll ever skip a message + // that's 90 days old, but there's no reason not to be thorough. + .filter(Self::skip_expired(&expire_at)) + // Filtering on the broadcast resume point filters out messages + // before resume_at, and filters out messages duplicated from + // stored_messages. + .filter(Self::resume(resume_broadcast_at)); Ok(stored_messages.chain(live_messages)) } - fn skip_stale( - resume_at: Option<&DateTime>, + fn resume( + resume_at: Option<broadcast::Sequence>, ) -> impl for<'m> FnMut(&'m broadcast::Message) -> future::Ready<bool> { - let resume_at = resume_at.cloned(); + let resume_at = resume_at; move |msg| { future::ready(match resume_at { None => true, - Some(resume_at) => msg.sent_at > resume_at, + Some(resume_at) => msg.sequence > resume_at, }) } } |
