diff options
| author | Owen Jacobson <owen@grimoire.ca> | 2024-09-28 20:13:10 -0400 |
|---|---|---|
| committer | Owen Jacobson <owen@grimoire.ca> | 2024-09-28 20:13:10 -0400 |
| commit | 72efedf8e96ca6e159ce6146809ee6d3a9e5a0e7 (patch) | |
| tree | 78e47ae3ed1ac00cc08e95686970900acd98a69c /src | |
| parent | 155f6f2556b21e6b25afe096b19adcde1255c598 (diff) | |
Clean up use of bare tuple as a vector element for ResumePoint.
Diffstat (limited to 'src')
| -rw-r--r-- | src/events/app.rs | 7 | ||||
| -rw-r--r-- | src/events/types.rs | 47 |
2 files changed, 20 insertions, 34 deletions
diff --git a/src/events/app.rs b/src/events/app.rs index 5162c67..0cdc641 100644 --- a/src/events/app.rs +++ b/src/events/app.rs @@ -112,7 +112,7 @@ impl<'a> Events<'a> { .await?; if let Some(last) = replay.last() { - resume_live_at.advance(&channel.id, last.sequence); + resume_live_at.advance(last); } replays.insert(channel.id.clone(), replay); @@ -135,10 +135,9 @@ impl<'a> Events<'a> { Ok(created_events.chain(replay).chain(live_messages).scan( resume_at, |resume_point, event| { - let channel = &event.channel_id(); match event.data { - types::ChannelEventData::Deleted(_) => resume_point.forget(channel), - _ => resume_point.advance(channel, event.sequence), + types::ChannelEventData::Deleted(_) => resume_point.forget(&event), + _ => resume_point.advance(&event), } let event = types::ResumableEvent(resume_point.clone(), event); diff --git a/src/events/types.rs b/src/events/types.rs index 966842d..d954512 100644 --- a/src/events/types.rs +++ b/src/events/types.rs @@ -33,31 +33,29 @@ impl Sequence { } } -// For the purposes of event replay, an "event ID" is a vector of per-channel -// sequence numbers. Replay will start with messages whose sequence number in -// its channel is higher than the sequence in the event ID, or if the channel -// is not listed in the event ID, then at the beginning. +// For the purposes of event replay, a resume point is a vector of resume +// elements. A resume element associates a channel (by ID) with the latest event +// seen in that channel so far. Replaying the event stream can restart at a +// predictable point - hence the name. These values can be serialized and sent +// to the client as JSON dicts, then rehydrated to recover the resume point at a +// later time. // // Using a sorted map ensures that there is a canonical representation for -// each event ID. +// each resume point. #[derive(Clone, Debug, Default, PartialEq, PartialOrd, serde::Deserialize, serde::Serialize)] #[serde(transparent)] pub struct ResumePoint(BTreeMap<channel::Id, Sequence>); impl ResumePoint { - pub fn singleton(channel: &channel::Id, sequence: Sequence) -> Self { - let mut vector = Self::default(); - vector.advance(channel, sequence); - vector - } - - pub fn advance(&mut self, channel: &channel::Id, sequence: Sequence) { + pub fn advance<'e>(&mut self, event: impl Into<ResumeElement<'e>>) { let Self(elements) = self; + let ResumeElement(channel, sequence) = event.into(); elements.insert(channel.clone(), sequence); } - pub fn forget(&mut self, channel: &channel::Id) { + pub fn forget<'e>(&mut self, event: impl Into<ResumeElement<'e>>) { let Self(elements) = self; + let ResumeElement(channel, _) = event.into(); elements.remove(channel); } @@ -66,9 +64,9 @@ impl ResumePoint { elements.get(channel).copied() } - pub fn not_after(&self, event: impl ResumeElement) -> bool { + pub fn not_after<'e>(&self, event: impl Into<ResumeElement<'e>>) -> bool { let Self(elements) = self; - let (channel, sequence) = event.element(); + let ResumeElement(channel, sequence) = event.into(); elements .get(channel) @@ -76,18 +74,7 @@ impl ResumePoint { } } -pub trait ResumeElement { - fn element(&self) -> (&channel::Id, Sequence); -} - -impl<T> ResumeElement for &T -where - T: ResumeElement, -{ - fn element(&self) -> (&channel::Id, Sequence) { - (*self).element() - } -} +pub struct ResumeElement<'i>(&'i channel::Id, Sequence); #[derive(Clone, Debug)] pub struct ResumableEvent(pub ResumePoint, pub ChannelEvent); @@ -120,9 +107,9 @@ impl ChannelEvent { } } -impl ResumeElement for ChannelEvent { - fn element(&self) -> (&channel::Id, Sequence) { - (self.channel_id(), self.sequence) +impl<'c> From<&'c ChannelEvent> for ResumeElement<'c> { + fn from(event: &'c ChannelEvent) -> Self { + Self(event.channel_id(), event.sequence) } } |
