diff options
| author | Owen Jacobson <owen@grimoire.ca> | 2024-09-27 18:17:02 -0400 |
|---|---|---|
| committer | Owen Jacobson <owen@grimoire.ca> | 2024-09-27 19:59:22 -0400 |
| commit | eff129bc1f29bcb1b2b9d10c6b49ab886edc83d6 (patch) | |
| tree | b82892a6cf40f771998a85e5530012bab80157dc /src/events/repo/message.rs | |
| parent | 68e3dce3c2e588376c6510783e908941360ac80e (diff) | |
Make `/api/events` a firehose endpoint.
It now includes events for all channels. Clients are responsible for filtering.
The schema for channel events has changed; it now includes a channel name and ID, in the same format as the sender's name and ID. They also now include a `"type"` field, whose only valid value (as of this writing) is `"message"`.
This is groundwork for delivering message deletion (expiry) events to clients, and notifying clients of channel lifecycle events.
Diffstat (limited to 'src/events/repo/message.rs')
| -rw-r--r-- | src/events/repo/message.rs | 145 |
1 files changed, 145 insertions, 0 deletions
diff --git a/src/events/repo/message.rs b/src/events/repo/message.rs new file mode 100644 index 0000000..b4724ea --- /dev/null +++ b/src/events/repo/message.rs @@ -0,0 +1,145 @@ +use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; + +use crate::{ + clock::DateTime, + events::types::{self, Sequence}, + repo::{ + channel::Channel, + login::{self, Login}, + message, + }, +}; + +pub trait Provider { + fn message_events(&mut self) -> Events; +} + +impl<'c> Provider for Transaction<'c, Sqlite> { + fn message_events(&mut self) -> Events { + Events(self) + } +} + +pub struct Events<'t>(&'t mut SqliteConnection); + +impl<'c> Events<'c> { + pub async fn create( + &mut self, + sender: &Login, + channel: &Channel, + body: &str, + sent_at: &DateTime, + ) -> Result<types::ChannelEvent, sqlx::Error> { + let sequence = self.next_sequence_for(channel).await?; + + let id = message::Id::generate(); + + let message = sqlx::query!( + r#" + insert into message + (id, channel, sequence, sender, body, sent_at) + values ($1, $2, $3, $4, $5, $6) + returning + id as "id: message::Id", + sequence as "sequence: Sequence", + sender as "sender: login::Id", + body, + sent_at as "sent_at: DateTime" + "#, + id, + channel.id, + sequence, + sender.id, + body, + sent_at, + ) + .map(|row| types::ChannelEvent { + sequence: row.sequence, + at: row.sent_at, + channel: channel.clone(), + data: types::MessageEvent { + id: row.id, + sender: sender.clone(), + body: row.body, + } + .into(), + }) + .fetch_one(&mut *self.0) + .await?; + + Ok(message) + } + + async fn next_sequence_for(&mut self, channel: &Channel) -> Result<Sequence, sqlx::Error> { + let current = sqlx::query_scalar!( + r#" + -- `max` never returns null, but sqlx can't detect that + select max(sequence) as "sequence!: Sequence" + from message + where channel = $1 + "#, + channel.id, + ) + .fetch_one(&mut *self.0) + .await?; + + Ok(current.next()) + } + + pub async fn expire(&mut self, expire_at: &DateTime) -> Result<(), sqlx::Error> { + sqlx::query!( + r#" + delete from message + where sent_at < $1 + "#, + expire_at, + ) + .execute(&mut *self.0) + .await?; + + Ok(()) + } + + pub async fn replay( + &mut self, + channel: &Channel, + resume_at: Option<Sequence>, + ) -> Result<Vec<types::ChannelEvent>, sqlx::Error> { + let events = sqlx::query!( + r#" + select + message.id as "id: message::Id", + sequence as "sequence: Sequence", + login.id as "sender_id: login::Id", + login.name as sender_name, + message.body, + message.sent_at as "sent_at: DateTime" + from message + join login on message.sender = login.id + where channel = $1 + and coalesce(sequence > $2, true) + order by sequence asc + "#, + channel.id, + resume_at, + ) + .map(|row| types::ChannelEvent { + sequence: row.sequence, + at: row.sent_at, + channel: channel.clone(), + data: types::MessageEvent { + id: row.id, + sender: login::Login { + id: row.sender_id, + name: row.sender_name, + }, + body: row.body, + } + .into(), + }) + .fetch_all(&mut *self.0) + .await?; + + Ok(events) + } +} |
