summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorOwen Jacobson <owen@grimoire.ca>2024-09-28 20:13:10 -0400
committerOwen Jacobson <owen@grimoire.ca>2024-09-28 20:13:10 -0400
commit72efedf8e96ca6e159ce6146809ee6d3a9e5a0e7 (patch)
tree78e47ae3ed1ac00cc08e95686970900acd98a69c /src
parent155f6f2556b21e6b25afe096b19adcde1255c598 (diff)
Clean up use of bare tuple as a vector element for ResumePoint.
Diffstat (limited to 'src')
-rw-r--r--src/events/app.rs7
-rw-r--r--src/events/types.rs47
2 files changed, 20 insertions, 34 deletions
diff --git a/src/events/app.rs b/src/events/app.rs
index 5162c67..0cdc641 100644
--- a/src/events/app.rs
+++ b/src/events/app.rs
@@ -112,7 +112,7 @@ impl<'a> Events<'a> {
.await?;
if let Some(last) = replay.last() {
- resume_live_at.advance(&channel.id, last.sequence);
+ resume_live_at.advance(last);
}
replays.insert(channel.id.clone(), replay);
@@ -135,10 +135,9 @@ impl<'a> Events<'a> {
Ok(created_events.chain(replay).chain(live_messages).scan(
resume_at,
|resume_point, event| {
- let channel = &event.channel_id();
match event.data {
- types::ChannelEventData::Deleted(_) => resume_point.forget(channel),
- _ => resume_point.advance(channel, event.sequence),
+ types::ChannelEventData::Deleted(_) => resume_point.forget(&event),
+ _ => resume_point.advance(&event),
}
let event = types::ResumableEvent(resume_point.clone(), event);
diff --git a/src/events/types.rs b/src/events/types.rs
index 966842d..d954512 100644
--- a/src/events/types.rs
+++ b/src/events/types.rs
@@ -33,31 +33,29 @@ impl Sequence {
}
}
-// For the purposes of event replay, an "event ID" is a vector of per-channel
-// sequence numbers. Replay will start with messages whose sequence number in
-// its channel is higher than the sequence in the event ID, or if the channel
-// is not listed in the event ID, then at the beginning.
+// For the purposes of event replay, a resume point is a vector of resume
+// elements. A resume element associates a channel (by ID) with the latest event
+// seen in that channel so far. Replaying the event stream can restart at a
+// predictable point - hence the name. These values can be serialized and sent
+// to the client as JSON dicts, then rehydrated to recover the resume point at a
+// later time.
//
// Using a sorted map ensures that there is a canonical representation for
-// each event ID.
+// each resume point.
#[derive(Clone, Debug, Default, PartialEq, PartialOrd, serde::Deserialize, serde::Serialize)]
#[serde(transparent)]
pub struct ResumePoint(BTreeMap<channel::Id, Sequence>);
impl ResumePoint {
- pub fn singleton(channel: &channel::Id, sequence: Sequence) -> Self {
- let mut vector = Self::default();
- vector.advance(channel, sequence);
- vector
- }
-
- pub fn advance(&mut self, channel: &channel::Id, sequence: Sequence) {
+ pub fn advance<'e>(&mut self, event: impl Into<ResumeElement<'e>>) {
let Self(elements) = self;
+ let ResumeElement(channel, sequence) = event.into();
elements.insert(channel.clone(), sequence);
}
- pub fn forget(&mut self, channel: &channel::Id) {
+ pub fn forget<'e>(&mut self, event: impl Into<ResumeElement<'e>>) {
let Self(elements) = self;
+ let ResumeElement(channel, _) = event.into();
elements.remove(channel);
}
@@ -66,9 +64,9 @@ impl ResumePoint {
elements.get(channel).copied()
}
- pub fn not_after(&self, event: impl ResumeElement) -> bool {
+ pub fn not_after<'e>(&self, event: impl Into<ResumeElement<'e>>) -> bool {
let Self(elements) = self;
- let (channel, sequence) = event.element();
+ let ResumeElement(channel, sequence) = event.into();
elements
.get(channel)
@@ -76,18 +74,7 @@ impl ResumePoint {
}
}
-pub trait ResumeElement {
- fn element(&self) -> (&channel::Id, Sequence);
-}
-
-impl<T> ResumeElement for &T
-where
- T: ResumeElement,
-{
- fn element(&self) -> (&channel::Id, Sequence) {
- (*self).element()
- }
-}
+pub struct ResumeElement<'i>(&'i channel::Id, Sequence);
#[derive(Clone, Debug)]
pub struct ResumableEvent(pub ResumePoint, pub ChannelEvent);
@@ -120,9 +107,9 @@ impl ChannelEvent {
}
}
-impl ResumeElement for ChannelEvent {
- fn element(&self) -> (&channel::Id, Sequence) {
- (self.channel_id(), self.sequence)
+impl<'c> From<&'c ChannelEvent> for ResumeElement<'c> {
+ fn from(event: &'c ChannelEvent) -> Self {
+ Self(event.channel_id(), event.sequence)
}
}