diff options
| author | Owen Jacobson <owen@grimoire.ca> | 2024-09-27 18:17:02 -0400 |
|---|---|---|
| committer | Owen Jacobson <owen@grimoire.ca> | 2024-09-27 19:59:22 -0400 |
| commit | eff129bc1f29bcb1b2b9d10c6b49ab886edc83d6 (patch) | |
| tree | b82892a6cf40f771998a85e5530012bab80157dc /src/events/types.rs | |
| parent | 68e3dce3c2e588376c6510783e908941360ac80e (diff) | |
Make `/api/events` a firehose endpoint.
It now includes events for all channels. Clients are responsible for filtering.
The schema for channel events has changed; it now includes a channel name and ID, in the same format as the sender's name and ID. They also now include a `"type"` field, whose only valid value (as of this writing) is `"message"`.
This is groundwork for delivering message deletion (expiry) events to clients, and notifying clients of channel lifecycle events.
Diffstat (limited to 'src/events/types.rs')
| -rw-r--r-- | src/events/types.rs | 99 |
1 files changed, 99 insertions, 0 deletions
diff --git a/src/events/types.rs b/src/events/types.rs new file mode 100644 index 0000000..6747afc --- /dev/null +++ b/src/events/types.rs @@ -0,0 +1,99 @@ +use std::collections::BTreeMap; + +use crate::{ + clock::DateTime, + repo::{ + channel::{self, Channel}, + login::Login, + message, + }, +}; + +#[derive( + Debug, + Eq, + Ord, + PartialEq, + PartialOrd, + Clone, + Copy, + serde::Serialize, + serde::Deserialize, + sqlx::Type, +)] +#[serde(transparent)] +#[sqlx(transparent)] +pub struct Sequence(i64); + +impl Sequence { + pub fn next(self) -> Self { + let Self(current) = self; + Self(current + 1) + } +} + +// For the purposes of event replay, an "event ID" is a vector of per-channel +// sequence numbers. Replay will start with messages whose sequence number in +// its channel is higher than the sequence in the event ID, or if the channel +// is not listed in the event ID, then at the beginning. +// +// Using a sorted map ensures that there is a canonical representation for +// each event ID. +#[derive(Clone, Debug, Default, PartialEq, PartialOrd, serde::Deserialize, serde::Serialize)] +#[serde(transparent)] +pub struct ResumePoint(BTreeMap<channel::Id, Sequence>); + +impl ResumePoint { + pub fn singleton(channel: &channel::Id, sequence: Sequence) -> Self { + let mut vector = Self::default(); + vector.advance(channel, sequence); + vector + } + + pub fn advance(&mut self, channel: &channel::Id, sequence: Sequence) { + let Self(elements) = self; + elements.insert(channel.clone(), sequence); + } + + pub fn get(&self, channel: &channel::Id) -> Option<Sequence> { + let Self(elements) = self; + elements.get(channel).copied() + } +} +#[derive(Clone, Debug)] +pub struct ResumableEvent(pub ResumePoint, pub ChannelEvent); + +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] +pub struct ChannelEvent { + #[serde(skip)] + pub sequence: Sequence, + pub at: DateTime, + pub channel: Channel, + #[serde(flatten)] + pub data: ChannelEventData, +} + +impl ChannelEvent { + pub fn sequence(&self) -> ResumePoint { + ResumePoint::singleton(&self.channel.id, self.sequence) + } +} + +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum ChannelEventData { + Message(MessageEvent), +} + +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] +pub struct MessageEvent { + pub id: message::Id, + pub sender: Login, + pub body: String, +} + +impl From<MessageEvent> for ChannelEventData { + fn from(message: MessageEvent) -> Self { + Self::Message(message) + } +} |
