diff options
| author | Kit La Touche <kit@transneptune.net> | 2024-09-28 21:55:50 -0400 |
|---|---|---|
| committer | Kit La Touche <kit@transneptune.net> | 2024-09-28 21:55:50 -0400 |
| commit | 897eef0306917baf3662e691b29f182d35805296 (patch) | |
| tree | 024e2a3fa13ac96e0b4339a6d62ae533efe7db07 /src/events/repo | |
| parent | c524b333befc8cc97aa49f73b3ed28bc3b82420c (diff) | |
| parent | 4d0bb0709b168a24ab6a8dbc86da45d7503596ee (diff) | |
Merge branch 'main' into feature-frontend
Diffstat (limited to 'src/events/repo')
| -rw-r--r-- | src/events/repo/broadcast.rs | 162 | ||||
| -rw-r--r-- | src/events/repo/message.rs | 198 | ||||
| -rw-r--r-- | src/events/repo/mod.rs | 2 |
3 files changed, 199 insertions, 163 deletions
diff --git a/src/events/repo/broadcast.rs b/src/events/repo/broadcast.rs deleted file mode 100644 index 6914573..0000000 --- a/src/events/repo/broadcast.rs +++ /dev/null @@ -1,162 +0,0 @@ -use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; - -use crate::{ - clock::DateTime, - repo::{ - channel::Channel, - login::{self, Login}, - message, - }, -}; - -pub trait Provider { - fn broadcast(&mut self) -> Broadcast; -} - -impl<'c> Provider for Transaction<'c, Sqlite> { - fn broadcast(&mut self) -> Broadcast { - Broadcast(self) - } -} - -pub struct Broadcast<'t>(&'t mut SqliteConnection); - -#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] -pub struct Message { - pub id: message::Id, - #[serde(skip)] - pub sequence: Sequence, - pub sender: Login, - pub body: String, - pub sent_at: DateTime, -} - -impl<'c> Broadcast<'c> { - pub async fn create( - &mut self, - sender: &Login, - channel: &Channel, - body: &str, - sent_at: &DateTime, - ) -> Result<Message, sqlx::Error> { - let sequence = self.next_sequence_for(channel).await?; - - let id = message::Id::generate(); - - let message = sqlx::query!( - r#" - insert into message - (id, channel, sequence, sender, body, sent_at) - values ($1, $2, $3, $4, $5, $6) - returning - id as "id: message::Id", - sequence as "sequence: Sequence", - sender as "sender: login::Id", - body, - sent_at as "sent_at: DateTime" - "#, - id, - channel.id, - sequence, - sender.id, - body, - sent_at, - ) - .map(|row| Message { - id: row.id, - sequence: row.sequence, - sender: sender.clone(), - body: row.body, - sent_at: row.sent_at, - }) - .fetch_one(&mut *self.0) - .await?; - - Ok(message) - } - - async fn next_sequence_for(&mut self, channel: &Channel) -> Result<Sequence, sqlx::Error> { - let Sequence(current) = sqlx::query_scalar!( - r#" - -- `max` never returns null, but sqlx can't detect that - select max(sequence) as "sequence!: Sequence" - from message - where channel = $1 - "#, - channel.id, - ) - .fetch_one(&mut *self.0) - .await?; - - Ok(Sequence(current + 1)) - } - - pub async fn expire(&mut self, expire_at: &DateTime) -> Result<(), sqlx::Error> { - sqlx::query!( - r#" - delete from message - where sent_at < $1 - "#, - expire_at, - ) - .execute(&mut *self.0) - .await?; - - Ok(()) - } - - pub async fn replay( - &mut self, - channel: &Channel, - resume_at: Option<Sequence>, - ) -> Result<Vec<Message>, sqlx::Error> { - let messages = sqlx::query!( - r#" - select - message.id as "id: message::Id", - sequence as "sequence: Sequence", - login.id as "sender_id: login::Id", - login.name as sender_name, - message.body, - message.sent_at as "sent_at: DateTime" - from message - join login on message.sender = login.id - where channel = $1 - and coalesce(sequence > $2, true) - order by sequence asc - "#, - channel.id, - resume_at, - ) - .map(|row| Message { - id: row.id, - sequence: row.sequence, - sender: Login { - id: row.sender_id, - name: row.sender_name, - }, - body: row.body, - sent_at: row.sent_at, - }) - .fetch_all(&mut *self.0) - .await?; - - Ok(messages) - } -} - -#[derive( - Debug, - Eq, - Ord, - PartialEq, - PartialOrd, - Clone, - Copy, - serde::Serialize, - serde::Deserialize, - sqlx::Type, -)] -#[serde(transparent)] -#[sqlx(transparent)] -pub struct Sequence(i64); diff --git a/src/events/repo/message.rs b/src/events/repo/message.rs new file mode 100644 index 0000000..f8bae2b --- /dev/null +++ b/src/events/repo/message.rs @@ -0,0 +1,198 @@ +use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; + +use crate::{ + clock::DateTime, + events::types::{self, Sequence}, + repo::{ + channel::{self, Channel}, + login::{self, Login}, + message::{self, Message}, + }, +}; + +pub trait Provider { + fn message_events(&mut self) -> Events; +} + +impl<'c> Provider for Transaction<'c, Sqlite> { + fn message_events(&mut self) -> Events { + Events(self) + } +} + +pub struct Events<'t>(&'t mut SqliteConnection); + +impl<'c> Events<'c> { + pub async fn create( + &mut self, + sender: &Login, + channel: &Channel, + body: &str, + sent_at: &DateTime, + ) -> Result<types::ChannelEvent, sqlx::Error> { + let sequence = self.assign_sequence(channel).await?; + + let id = message::Id::generate(); + + let message = sqlx::query!( + r#" + insert into message + (id, channel, sequence, sender, body, sent_at) + values ($1, $2, $3, $4, $5, $6) + returning + id as "id: message::Id", + sequence as "sequence: Sequence", + sender as "sender: login::Id", + body, + sent_at as "sent_at: DateTime" + "#, + id, + channel.id, + sequence, + sender.id, + body, + sent_at, + ) + .map(|row| types::ChannelEvent { + sequence: row.sequence, + at: row.sent_at, + data: types::MessageEvent { + channel: channel.clone(), + sender: sender.clone(), + message: Message { + id: row.id, + body: row.body, + }, + } + .into(), + }) + .fetch_one(&mut *self.0) + .await?; + + Ok(message) + } + + pub async fn assign_sequence(&mut self, channel: &Channel) -> Result<Sequence, sqlx::Error> { + let next = sqlx::query_scalar!( + r#" + update channel + set last_sequence = last_sequence + 1 + where id = $1 + returning last_sequence as "next_sequence: Sequence" + "#, + channel.id, + ) + .fetch_one(&mut *self.0) + .await?; + + Ok(next) + } + + pub async fn delete_expired( + &mut self, + channel: &Channel, + message: &message::Id, + sequence: Sequence, + deleted_at: &DateTime, + ) -> Result<types::ChannelEvent, sqlx::Error> { + sqlx::query_scalar!( + r#" + delete from message + where id = $1 + returning 1 as "row: i64" + "#, + message, + ) + .fetch_one(&mut *self.0) + .await?; + + Ok(types::ChannelEvent { + sequence, + at: *deleted_at, + data: types::MessageDeletedEvent { + channel: channel.clone(), + message: message.clone(), + } + .into(), + }) + } + + pub async fn expired( + &mut self, + expire_at: &DateTime, + ) -> Result<Vec<(Channel, message::Id)>, sqlx::Error> { + let messages = sqlx::query!( + r#" + select + channel.id as "channel_id: channel::Id", + channel.name as "channel_name", + channel.created_at as "channel_created_at: DateTime", + message.id as "message: message::Id" + from message + join channel on message.channel = channel.id + join login as sender on message.sender = sender.id + where sent_at < $1 + "#, + expire_at, + ) + .map(|row| { + ( + Channel { + id: row.channel_id, + name: row.channel_name, + created_at: row.channel_created_at, + }, + row.message, + ) + }) + .fetch_all(&mut *self.0) + .await?; + + Ok(messages) + } + + pub async fn replay( + &mut self, + channel: &Channel, + resume_at: Option<Sequence>, + ) -> Result<Vec<types::ChannelEvent>, sqlx::Error> { + let events = sqlx::query!( + r#" + select + message.id as "id: message::Id", + sequence as "sequence: Sequence", + login.id as "sender_id: login::Id", + login.name as sender_name, + message.body, + message.sent_at as "sent_at: DateTime" + from message + join login on message.sender = login.id + where channel = $1 + and coalesce(sequence > $2, true) + order by sequence asc + "#, + channel.id, + resume_at, + ) + .map(|row| types::ChannelEvent { + sequence: row.sequence, + at: row.sent_at, + data: types::MessageEvent { + channel: channel.clone(), + sender: login::Login { + id: row.sender_id, + name: row.sender_name, + }, + message: Message { + id: row.id, + body: row.body, + }, + } + .into(), + }) + .fetch_all(&mut *self.0) + .await?; + + Ok(events) + } +} diff --git a/src/events/repo/mod.rs b/src/events/repo/mod.rs index 2ed3062..e216a50 100644 --- a/src/events/repo/mod.rs +++ b/src/events/repo/mod.rs @@ -1 +1 @@ -pub mod broadcast; +pub mod message; |
