summaryrefslogtreecommitdiff
path: root/src/events/types.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/events/types.rs')
-rw-r--r--src/events/types.rs79
1 files changed, 4 insertions, 75 deletions
diff --git a/src/events/types.rs b/src/events/types.rs
index d954512..aca3af4 100644
--- a/src/events/types.rs
+++ b/src/events/types.rs
@@ -1,84 +1,13 @@
-use std::collections::BTreeMap;
-
use crate::{
clock::DateTime,
repo::{
channel::{self, Channel},
login::Login,
message,
+ sequence::Sequence,
},
};
-#[derive(
- Debug,
- Default,
- Eq,
- Ord,
- PartialEq,
- PartialOrd,
- Clone,
- Copy,
- serde::Serialize,
- serde::Deserialize,
- sqlx::Type,
-)]
-#[serde(transparent)]
-#[sqlx(transparent)]
-pub struct Sequence(i64);
-
-impl Sequence {
- pub fn next(self) -> Self {
- let Self(current) = self;
- Self(current + 1)
- }
-}
-
-// For the purposes of event replay, a resume point is a vector of resume
-// elements. A resume element associates a channel (by ID) with the latest event
-// seen in that channel so far. Replaying the event stream can restart at a
-// predictable point - hence the name. These values can be serialized and sent
-// to the client as JSON dicts, then rehydrated to recover the resume point at a
-// later time.
-//
-// Using a sorted map ensures that there is a canonical representation for
-// each resume point.
-#[derive(Clone, Debug, Default, PartialEq, PartialOrd, serde::Deserialize, serde::Serialize)]
-#[serde(transparent)]
-pub struct ResumePoint(BTreeMap<channel::Id, Sequence>);
-
-impl ResumePoint {
- pub fn advance<'e>(&mut self, event: impl Into<ResumeElement<'e>>) {
- let Self(elements) = self;
- let ResumeElement(channel, sequence) = event.into();
- elements.insert(channel.clone(), sequence);
- }
-
- pub fn forget<'e>(&mut self, event: impl Into<ResumeElement<'e>>) {
- let Self(elements) = self;
- let ResumeElement(channel, _) = event.into();
- elements.remove(channel);
- }
-
- pub fn get(&self, channel: &channel::Id) -> Option<Sequence> {
- let Self(elements) = self;
- elements.get(channel).copied()
- }
-
- pub fn not_after<'e>(&self, event: impl Into<ResumeElement<'e>>) -> bool {
- let Self(elements) = self;
- let ResumeElement(channel, sequence) = event.into();
-
- elements
- .get(channel)
- .map_or(true, |resume_at| resume_at < &sequence)
- }
-}
-
-pub struct ResumeElement<'i>(&'i channel::Id, Sequence);
-
-#[derive(Clone, Debug)]
-pub struct ResumableEvent(pub ResumePoint, pub ChannelEvent);
-
#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
pub struct ChannelEvent {
#[serde(skip)]
@@ -92,7 +21,7 @@ impl ChannelEvent {
pub fn created(channel: Channel) -> Self {
Self {
at: channel.created_at,
- sequence: Sequence::default(),
+ sequence: channel.created_sequence,
data: CreatedEvent { channel }.into(),
}
}
@@ -107,9 +36,9 @@ impl ChannelEvent {
}
}
-impl<'c> From<&'c ChannelEvent> for ResumeElement<'c> {
+impl<'c> From<&'c ChannelEvent> for Sequence {
fn from(event: &'c ChannelEvent) -> Self {
- Self(event.channel_id(), event.sequence)
+ event.sequence
}
}