use std::collections::BTreeMap; use crate::{ clock::DateTime, repo::{ channel::{self, Channel}, login::Login, message, }, }; #[derive( Debug, Default, Eq, Ord, PartialEq, PartialOrd, Clone, Copy, serde::Serialize, serde::Deserialize, sqlx::Type, )] #[serde(transparent)] #[sqlx(transparent)] pub struct Sequence(i64); impl Sequence { pub fn next(self) -> Self { let Self(current) = self; Self(current + 1) } } // For the purposes of event replay, a resume point is a vector of resume // elements. A resume element associates a channel (by ID) with the latest event // seen in that channel so far. Replaying the event stream can restart at a // predictable point - hence the name. These values can be serialized and sent // to the client as JSON dicts, then rehydrated to recover the resume point at a // later time. // // Using a sorted map ensures that there is a canonical representation for // each resume point. #[derive(Clone, Debug, Default, PartialEq, PartialOrd, serde::Deserialize, serde::Serialize)] #[serde(transparent)] pub struct ResumePoint(BTreeMap); impl ResumePoint { pub fn advance<'e>(&mut self, event: impl Into>) { let Self(elements) = self; let ResumeElement(channel, sequence) = event.into(); elements.insert(channel.clone(), sequence); } pub fn forget<'e>(&mut self, event: impl Into>) { let Self(elements) = self; let ResumeElement(channel, _) = event.into(); elements.remove(channel); } pub fn get(&self, channel: &channel::Id) -> Option { let Self(elements) = self; elements.get(channel).copied() } pub fn not_after<'e>(&self, event: impl Into>) -> bool { let Self(elements) = self; let ResumeElement(channel, sequence) = event.into(); elements .get(channel) .map_or(true, |resume_at| resume_at < &sequence) } } pub struct ResumeElement<'i>(&'i channel::Id, Sequence); #[derive(Clone, Debug)] pub struct ResumableEvent(pub ResumePoint, pub ChannelEvent); #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] pub struct ChannelEvent { #[serde(skip)] pub sequence: Sequence, pub at: DateTime, #[serde(flatten)] pub data: ChannelEventData, } impl ChannelEvent { pub fn created(channel: Channel) -> Self { Self { at: channel.created_at, sequence: Sequence::default(), data: CreatedEvent { channel }.into(), } } pub fn channel_id(&self) -> &channel::Id { match &self.data { ChannelEventData::Created(event) => &event.channel.id, ChannelEventData::Message(event) => &event.channel.id, ChannelEventData::MessageDeleted(event) => &event.channel.id, ChannelEventData::Deleted(event) => &event.channel, } } } impl<'c> From<&'c ChannelEvent> for ResumeElement<'c> { fn from(event: &'c ChannelEvent) -> Self { Self(event.channel_id(), event.sequence) } } #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] #[serde(tag = "type", rename_all = "snake_case")] pub enum ChannelEventData { Created(CreatedEvent), Message(MessageEvent), MessageDeleted(MessageDeletedEvent), Deleted(DeletedEvent), } #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] pub struct CreatedEvent { pub channel: Channel, } impl From for ChannelEventData { fn from(event: CreatedEvent) -> Self { Self::Created(event) } } #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] pub struct MessageEvent { pub channel: Channel, pub sender: Login, pub message: message::Message, } impl From for ChannelEventData { fn from(event: MessageEvent) -> Self { Self::Message(event) } } #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] pub struct MessageDeletedEvent { pub channel: Channel, pub message: message::Id, } impl From for ChannelEventData { fn from(event: MessageDeletedEvent) -> Self { Self::MessageDeleted(event) } } #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] pub struct DeletedEvent { pub channel: channel::Id, } impl From for ChannelEventData { fn from(event: DeletedEvent) -> Self { Self::Deleted(event) } }