diff options
| author | Owen Jacobson <owen@grimoire.ca> | 2024-09-27 18:17:02 -0400 |
|---|---|---|
| committer | Owen Jacobson <owen@grimoire.ca> | 2024-09-27 19:59:22 -0400 |
| commit | eff129bc1f29bcb1b2b9d10c6b49ab886edc83d6 (patch) | |
| tree | b82892a6cf40f771998a85e5530012bab80157dc /src/events/repo | |
| parent | 68e3dce3c2e588376c6510783e908941360ac80e (diff) | |
Make `/api/events` a firehose endpoint.
It now includes events for all channels. Clients are responsible for filtering.
The schema for channel events has changed; it now includes a channel name and ID, in the same format as the sender's name and ID. They also now include a `"type"` field, whose only valid value (as of this writing) is `"message"`.
This is groundwork for delivering message deletion (expiry) events to clients, and notifying clients of channel lifecycle events.
Diffstat (limited to 'src/events/repo')
| -rw-r--r-- | src/events/repo/message.rs (renamed from src/events/repo/broadcast.rs) | 83 | ||||
| -rw-r--r-- | src/events/repo/mod.rs | 2 |
2 files changed, 34 insertions, 51 deletions
diff --git a/src/events/repo/broadcast.rs b/src/events/repo/message.rs index 6914573..b4724ea 100644 --- a/src/events/repo/broadcast.rs +++ b/src/events/repo/message.rs @@ -2,6 +2,7 @@ use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; use crate::{ clock::DateTime, + events::types::{self, Sequence}, repo::{ channel::Channel, login::{self, Login}, @@ -10,35 +11,25 @@ use crate::{ }; pub trait Provider { - fn broadcast(&mut self) -> Broadcast; + fn message_events(&mut self) -> Events; } impl<'c> Provider for Transaction<'c, Sqlite> { - fn broadcast(&mut self) -> Broadcast { - Broadcast(self) + fn message_events(&mut self) -> Events { + Events(self) } } -pub struct Broadcast<'t>(&'t mut SqliteConnection); +pub struct Events<'t>(&'t mut SqliteConnection); -#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] -pub struct Message { - pub id: message::Id, - #[serde(skip)] - pub sequence: Sequence, - pub sender: Login, - pub body: String, - pub sent_at: DateTime, -} - -impl<'c> Broadcast<'c> { +impl<'c> Events<'c> { pub async fn create( &mut self, sender: &Login, channel: &Channel, body: &str, sent_at: &DateTime, - ) -> Result<Message, sqlx::Error> { + ) -> Result<types::ChannelEvent, sqlx::Error> { let sequence = self.next_sequence_for(channel).await?; let id = message::Id::generate(); @@ -62,12 +53,16 @@ impl<'c> Broadcast<'c> { body, sent_at, ) - .map(|row| Message { - id: row.id, + .map(|row| types::ChannelEvent { sequence: row.sequence, - sender: sender.clone(), - body: row.body, - sent_at: row.sent_at, + at: row.sent_at, + channel: channel.clone(), + data: types::MessageEvent { + id: row.id, + sender: sender.clone(), + body: row.body, + } + .into(), }) .fetch_one(&mut *self.0) .await?; @@ -76,7 +71,7 @@ impl<'c> Broadcast<'c> { } async fn next_sequence_for(&mut self, channel: &Channel) -> Result<Sequence, sqlx::Error> { - let Sequence(current) = sqlx::query_scalar!( + let current = sqlx::query_scalar!( r#" -- `max` never returns null, but sqlx can't detect that select max(sequence) as "sequence!: Sequence" @@ -88,7 +83,7 @@ impl<'c> Broadcast<'c> { .fetch_one(&mut *self.0) .await?; - Ok(Sequence(current + 1)) + Ok(current.next()) } pub async fn expire(&mut self, expire_at: &DateTime) -> Result<(), sqlx::Error> { @@ -109,8 +104,8 @@ impl<'c> Broadcast<'c> { &mut self, channel: &Channel, resume_at: Option<Sequence>, - ) -> Result<Vec<Message>, sqlx::Error> { - let messages = sqlx::query!( + ) -> Result<Vec<types::ChannelEvent>, sqlx::Error> { + let events = sqlx::query!( r#" select message.id as "id: message::Id", @@ -128,35 +123,23 @@ impl<'c> Broadcast<'c> { channel.id, resume_at, ) - .map(|row| Message { - id: row.id, + .map(|row| types::ChannelEvent { sequence: row.sequence, - sender: Login { - id: row.sender_id, - name: row.sender_name, - }, - body: row.body, - sent_at: row.sent_at, + at: row.sent_at, + channel: channel.clone(), + data: types::MessageEvent { + id: row.id, + sender: login::Login { + id: row.sender_id, + name: row.sender_name, + }, + body: row.body, + } + .into(), }) .fetch_all(&mut *self.0) .await?; - Ok(messages) + Ok(events) } } - -#[derive( - Debug, - Eq, - Ord, - PartialEq, - PartialOrd, - Clone, - Copy, - serde::Serialize, - serde::Deserialize, - sqlx::Type, -)] -#[serde(transparent)] -#[sqlx(transparent)] -pub struct Sequence(i64); diff --git a/src/events/repo/mod.rs b/src/events/repo/mod.rs index 2ed3062..e216a50 100644 --- a/src/events/repo/mod.rs +++ b/src/events/repo/mod.rs @@ -1 +1 @@ -pub mod broadcast; +pub mod message; |
