diff options
Diffstat (limited to 'src/events/types.rs')
| -rw-r--r-- | src/events/types.rs | 170 |
1 files changed, 170 insertions, 0 deletions
diff --git a/src/events/types.rs b/src/events/types.rs new file mode 100644 index 0000000..d954512 --- /dev/null +++ b/src/events/types.rs @@ -0,0 +1,170 @@ +use std::collections::BTreeMap; + +use crate::{ + clock::DateTime, + repo::{ + channel::{self, Channel}, + login::Login, + message, + }, +}; + +#[derive( + Debug, + Default, + Eq, + Ord, + PartialEq, + PartialOrd, + Clone, + Copy, + serde::Serialize, + serde::Deserialize, + sqlx::Type, +)] +#[serde(transparent)] +#[sqlx(transparent)] +pub struct Sequence(i64); + +impl Sequence { + pub fn next(self) -> Self { + let Self(current) = self; + Self(current + 1) + } +} + +// For the purposes of event replay, a resume point is a vector of resume +// elements. A resume element associates a channel (by ID) with the latest event +// seen in that channel so far. Replaying the event stream can restart at a +// predictable point - hence the name. These values can be serialized and sent +// to the client as JSON dicts, then rehydrated to recover the resume point at a +// later time. +// +// Using a sorted map ensures that there is a canonical representation for +// each resume point. +#[derive(Clone, Debug, Default, PartialEq, PartialOrd, serde::Deserialize, serde::Serialize)] +#[serde(transparent)] +pub struct ResumePoint(BTreeMap<channel::Id, Sequence>); + +impl ResumePoint { + pub fn advance<'e>(&mut self, event: impl Into<ResumeElement<'e>>) { + let Self(elements) = self; + let ResumeElement(channel, sequence) = event.into(); + elements.insert(channel.clone(), sequence); + } + + pub fn forget<'e>(&mut self, event: impl Into<ResumeElement<'e>>) { + let Self(elements) = self; + let ResumeElement(channel, _) = event.into(); + elements.remove(channel); + } + + pub fn get(&self, channel: &channel::Id) -> Option<Sequence> { + let Self(elements) = self; + elements.get(channel).copied() + } + + pub fn not_after<'e>(&self, event: impl Into<ResumeElement<'e>>) -> bool { + let Self(elements) = self; + let ResumeElement(channel, sequence) = event.into(); + + elements + .get(channel) + .map_or(true, |resume_at| resume_at < &sequence) + } +} + +pub struct ResumeElement<'i>(&'i channel::Id, Sequence); + +#[derive(Clone, Debug)] +pub struct ResumableEvent(pub ResumePoint, pub ChannelEvent); + +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] +pub struct ChannelEvent { + #[serde(skip)] + pub sequence: Sequence, + pub at: DateTime, + #[serde(flatten)] + pub data: ChannelEventData, +} + +impl ChannelEvent { + pub fn created(channel: Channel) -> Self { + Self { + at: channel.created_at, + sequence: Sequence::default(), + data: CreatedEvent { channel }.into(), + } + } + + pub fn channel_id(&self) -> &channel::Id { + match &self.data { + ChannelEventData::Created(event) => &event.channel.id, + ChannelEventData::Message(event) => &event.channel.id, + ChannelEventData::MessageDeleted(event) => &event.channel.id, + ChannelEventData::Deleted(event) => &event.channel, + } + } +} + +impl<'c> From<&'c ChannelEvent> for ResumeElement<'c> { + fn from(event: &'c ChannelEvent) -> Self { + Self(event.channel_id(), event.sequence) + } +} + +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum ChannelEventData { + Created(CreatedEvent), + Message(MessageEvent), + MessageDeleted(MessageDeletedEvent), + Deleted(DeletedEvent), +} + +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] +pub struct CreatedEvent { + pub channel: Channel, +} + +impl From<CreatedEvent> for ChannelEventData { + fn from(event: CreatedEvent) -> Self { + Self::Created(event) + } +} + +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] +pub struct MessageEvent { + pub channel: Channel, + pub sender: Login, + pub message: message::Message, +} + +impl From<MessageEvent> for ChannelEventData { + fn from(event: MessageEvent) -> Self { + Self::Message(event) + } +} + +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] +pub struct MessageDeletedEvent { + pub channel: Channel, + pub message: message::Id, +} + +impl From<MessageDeletedEvent> for ChannelEventData { + fn from(event: MessageDeletedEvent) -> Self { + Self::MessageDeleted(event) + } +} + +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] +pub struct DeletedEvent { + pub channel: channel::Id, +} + +impl From<DeletedEvent> for ChannelEventData { + fn from(event: DeletedEvent) -> Self { + Self::Deleted(event) + } +} |
