diff options
Diffstat (limited to 'src/events/types.rs')
| -rw-r--r-- | src/events/types.rs | 79 |
1 files changed, 4 insertions, 75 deletions
diff --git a/src/events/types.rs b/src/events/types.rs index d954512..aca3af4 100644 --- a/src/events/types.rs +++ b/src/events/types.rs @@ -1,84 +1,13 @@ -use std::collections::BTreeMap; - use crate::{ clock::DateTime, repo::{ channel::{self, Channel}, login::Login, message, + sequence::Sequence, }, }; -#[derive( - Debug, - Default, - Eq, - Ord, - PartialEq, - PartialOrd, - Clone, - Copy, - serde::Serialize, - serde::Deserialize, - sqlx::Type, -)] -#[serde(transparent)] -#[sqlx(transparent)] -pub struct Sequence(i64); - -impl Sequence { - pub fn next(self) -> Self { - let Self(current) = self; - Self(current + 1) - } -} - -// For the purposes of event replay, a resume point is a vector of resume -// elements. A resume element associates a channel (by ID) with the latest event -// seen in that channel so far. Replaying the event stream can restart at a -// predictable point - hence the name. These values can be serialized and sent -// to the client as JSON dicts, then rehydrated to recover the resume point at a -// later time. -// -// Using a sorted map ensures that there is a canonical representation for -// each resume point. -#[derive(Clone, Debug, Default, PartialEq, PartialOrd, serde::Deserialize, serde::Serialize)] -#[serde(transparent)] -pub struct ResumePoint(BTreeMap<channel::Id, Sequence>); - -impl ResumePoint { - pub fn advance<'e>(&mut self, event: impl Into<ResumeElement<'e>>) { - let Self(elements) = self; - let ResumeElement(channel, sequence) = event.into(); - elements.insert(channel.clone(), sequence); - } - - pub fn forget<'e>(&mut self, event: impl Into<ResumeElement<'e>>) { - let Self(elements) = self; - let ResumeElement(channel, _) = event.into(); - elements.remove(channel); - } - - pub fn get(&self, channel: &channel::Id) -> Option<Sequence> { - let Self(elements) = self; - elements.get(channel).copied() - } - - pub fn not_after<'e>(&self, event: impl Into<ResumeElement<'e>>) -> bool { - let Self(elements) = self; - let ResumeElement(channel, sequence) = event.into(); - - elements - .get(channel) - .map_or(true, |resume_at| resume_at < &sequence) - } -} - -pub struct ResumeElement<'i>(&'i channel::Id, Sequence); - -#[derive(Clone, Debug)] -pub struct ResumableEvent(pub ResumePoint, pub ChannelEvent); - #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] pub struct ChannelEvent { #[serde(skip)] @@ -92,7 +21,7 @@ impl ChannelEvent { pub fn created(channel: Channel) -> Self { Self { at: channel.created_at, - sequence: Sequence::default(), + sequence: channel.created_sequence, data: CreatedEvent { channel }.into(), } } @@ -107,9 +36,9 @@ impl ChannelEvent { } } -impl<'c> From<&'c ChannelEvent> for ResumeElement<'c> { +impl<'c> From<&'c ChannelEvent> for Sequence { fn from(event: &'c ChannelEvent) -> Self { - Self(event.channel_id(), event.sequence) + event.sequence } } |
