From 155f6f2556b21e6b25afe096b19adcde1255c598 Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Fri, 27 Sep 2024 23:46:55 -0400 Subject: Expire channels, too. --- src/events/app.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) (limited to 'src/events/app.rs') diff --git a/src/events/app.rs b/src/events/app.rs index 03f3ee6..5162c67 100644 --- a/src/events/app.rs +++ b/src/events/app.rs @@ -64,9 +64,10 @@ impl<'a> Events<'a> { let mut events = Vec::with_capacity(expired.len()); for (channel, message) in expired { + let sequence = tx.message_events().assign_sequence(&channel).await?; let event = tx .message_events() - .delete_expired(&channel, &message, relative_to) + .delete_expired(&channel, &message, sequence, relative_to) .await?; events.push(event); } @@ -134,9 +135,11 @@ impl<'a> Events<'a> { Ok(created_events.chain(replay).chain(live_messages).scan( resume_at, |resume_point, event| { - let channel = &event.channel.id; - let sequence = event.sequence; - resume_point.advance(channel, sequence); + let channel = &event.channel_id(); + match event.data { + types::ChannelEventData::Deleted(_) => resume_point.forget(channel), + _ => resume_point.advance(channel, event.sequence), + } let event = types::ResumableEvent(resume_point.clone(), event); -- cgit v1.2.3