use std::collections::BTreeMap; use crate::{ clock::DateTime, repo::{ channel::{self, Channel}, login::Login, message, }, }; #[derive( Debug, Default, Eq, Ord, PartialEq, PartialOrd, Clone, Copy, serde::Serialize, serde::Deserialize, sqlx::Type, )] #[serde(transparent)] #[sqlx(transparent)] pub struct Sequence(i64); impl Sequence { pub fn next(self) -> Self { let Self(current) = self; Self(current + 1) } } // For the purposes of event replay, an "event ID" is a vector of per-channel // sequence numbers. Replay will start with messages whose sequence number in // its channel is higher than the sequence in the event ID, or if the channel // is not listed in the event ID, then at the beginning. // // Using a sorted map ensures that there is a canonical representation for // each event ID. #[derive(Clone, Debug, Default, PartialEq, PartialOrd, serde::Deserialize, serde::Serialize)] #[serde(transparent)] pub struct ResumePoint(BTreeMap); impl ResumePoint { pub fn singleton(channel: &channel::Id, sequence: Sequence) -> Self { let mut vector = Self::default(); vector.advance(channel, sequence); vector } pub fn advance(&mut self, channel: &channel::Id, sequence: Sequence) { let Self(elements) = self; elements.insert(channel.clone(), sequence); } pub fn get(&self, channel: &channel::Id) -> Option { let Self(elements) = self; elements.get(channel).copied() } pub fn not_after(&self, event: impl ResumeElement) -> bool { let Self(elements) = self; let (channel, sequence) = event.element(); elements .get(channel) .map_or(true, |resume_at| resume_at < &sequence) } } pub trait ResumeElement { fn element(&self) -> (&channel::Id, Sequence); } impl ResumeElement for &T where T: ResumeElement, { fn element(&self) -> (&channel::Id, Sequence) { (*self).element() } } #[derive(Clone, Debug)] pub struct ResumableEvent(pub ResumePoint, pub ChannelEvent); #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] pub struct ChannelEvent { #[serde(skip)] pub sequence: Sequence, pub at: DateTime, pub channel: Channel, #[serde(flatten)] pub data: ChannelEventData, } impl ChannelEvent { pub fn created(channel: Channel) -> Self { Self { at: channel.created_at, sequence: Sequence::default(), channel, data: ChannelEventData::Created, } } } impl ResumeElement for ChannelEvent { fn element(&self) -> (&channel::Id, Sequence) { (&self.channel.id, self.sequence) } } #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] #[serde(tag = "type", rename_all = "snake_case")] pub enum ChannelEventData { Created, Message(MessageEvent), MessageDeleted(MessageDeletedEvent), } #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] pub struct MessageEvent { pub sender: Login, pub message: message::Message, } impl From for ChannelEventData { fn from(message: MessageEvent) -> Self { Self::Message(message) } } #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] pub struct MessageDeletedEvent { pub message: message::Id, } impl From for ChannelEventData { fn from(message: MessageDeletedEvent) -> Self { Self::MessageDeleted(message) } }