diff options
Diffstat (limited to 'src/events/repo/message.rs')
| -rw-r--r-- | src/events/repo/message.rs | 198 |
1 files changed, 0 insertions, 198 deletions
diff --git a/src/events/repo/message.rs b/src/events/repo/message.rs deleted file mode 100644 index f8bae2b..0000000 --- a/src/events/repo/message.rs +++ /dev/null @@ -1,198 +0,0 @@ -use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; - -use crate::{ - clock::DateTime, - events::types::{self, Sequence}, - repo::{ - channel::{self, Channel}, - login::{self, Login}, - message::{self, Message}, - }, -}; - -pub trait Provider { - fn message_events(&mut self) -> Events; -} - -impl<'c> Provider for Transaction<'c, Sqlite> { - fn message_events(&mut self) -> Events { - Events(self) - } -} - -pub struct Events<'t>(&'t mut SqliteConnection); - -impl<'c> Events<'c> { - pub async fn create( - &mut self, - sender: &Login, - channel: &Channel, - body: &str, - sent_at: &DateTime, - ) -> Result<types::ChannelEvent, sqlx::Error> { - let sequence = self.assign_sequence(channel).await?; - - let id = message::Id::generate(); - - let message = sqlx::query!( - r#" - insert into message - (id, channel, sequence, sender, body, sent_at) - values ($1, $2, $3, $4, $5, $6) - returning - id as "id: message::Id", - sequence as "sequence: Sequence", - sender as "sender: login::Id", - body, - sent_at as "sent_at: DateTime" - "#, - id, - channel.id, - sequence, - sender.id, - body, - sent_at, - ) - .map(|row| types::ChannelEvent { - sequence: row.sequence, - at: row.sent_at, - data: types::MessageEvent { - channel: channel.clone(), - sender: sender.clone(), - message: Message { - id: row.id, - body: row.body, - }, - } - .into(), - }) - .fetch_one(&mut *self.0) - .await?; - - Ok(message) - } - - pub async fn assign_sequence(&mut self, channel: &Channel) -> Result<Sequence, sqlx::Error> { - let next = sqlx::query_scalar!( - r#" - update channel - set last_sequence = last_sequence + 1 - where id = $1 - returning last_sequence as "next_sequence: Sequence" - "#, - channel.id, - ) - .fetch_one(&mut *self.0) - .await?; - - Ok(next) - } - - pub async fn delete_expired( - &mut self, - channel: &Channel, - message: &message::Id, - sequence: Sequence, - deleted_at: &DateTime, - ) -> Result<types::ChannelEvent, sqlx::Error> { - sqlx::query_scalar!( - r#" - delete from message - where id = $1 - returning 1 as "row: i64" - "#, - message, - ) - .fetch_one(&mut *self.0) - .await?; - - Ok(types::ChannelEvent { - sequence, - at: *deleted_at, - data: types::MessageDeletedEvent { - channel: channel.clone(), - message: message.clone(), - } - .into(), - }) - } - - pub async fn expired( - &mut self, - expire_at: &DateTime, - ) -> Result<Vec<(Channel, message::Id)>, sqlx::Error> { - let messages = sqlx::query!( - r#" - select - channel.id as "channel_id: channel::Id", - channel.name as "channel_name", - channel.created_at as "channel_created_at: DateTime", - message.id as "message: message::Id" - from message - join channel on message.channel = channel.id - join login as sender on message.sender = sender.id - where sent_at < $1 - "#, - expire_at, - ) - .map(|row| { - ( - Channel { - id: row.channel_id, - name: row.channel_name, - created_at: row.channel_created_at, - }, - row.message, - ) - }) - .fetch_all(&mut *self.0) - .await?; - - Ok(messages) - } - - pub async fn replay( - &mut self, - channel: &Channel, - resume_at: Option<Sequence>, - ) -> Result<Vec<types::ChannelEvent>, sqlx::Error> { - let events = sqlx::query!( - r#" - select - message.id as "id: message::Id", - sequence as "sequence: Sequence", - login.id as "sender_id: login::Id", - login.name as sender_name, - message.body, - message.sent_at as "sent_at: DateTime" - from message - join login on message.sender = login.id - where channel = $1 - and coalesce(sequence > $2, true) - order by sequence asc - "#, - channel.id, - resume_at, - ) - .map(|row| types::ChannelEvent { - sequence: row.sequence, - at: row.sent_at, - data: types::MessageEvent { - channel: channel.clone(), - sender: login::Login { - id: row.sender_id, - name: row.sender_name, - }, - message: Message { - id: row.id, - body: row.body, - }, - } - .into(), - }) - .fetch_all(&mut *self.0) - .await?; - - Ok(events) - } -} |
