use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; use crate::{ clock::DateTime, events::types::{self, Sequence}, repo::{ channel::{self, Channel}, login::{self, Login}, message::{self, Message}, }, }; pub trait Provider { fn message_events(&mut self) -> Events; } impl<'c> Provider for Transaction<'c, Sqlite> { fn message_events(&mut self) -> Events { Events(self) } } pub struct Events<'t>(&'t mut SqliteConnection); impl<'c> Events<'c> { pub async fn create( &mut self, sender: &Login, channel: &Channel, body: &str, sent_at: &DateTime, ) -> Result { let sequence = self.assign_sequence(channel).await?; let id = message::Id::generate(); let message = sqlx::query!( r#" insert into message (id, channel, sequence, sender, body, sent_at) values ($1, $2, $3, $4, $5, $6) returning id as "id: message::Id", sequence as "sequence: Sequence", sender as "sender: login::Id", body, sent_at as "sent_at: DateTime" "#, id, channel.id, sequence, sender.id, body, sent_at, ) .map(|row| types::ChannelEvent { sequence: row.sequence, at: row.sent_at, channel: channel.clone(), data: types::MessageEvent { sender: sender.clone(), message: Message { id: row.id, body: row.body, }, } .into(), }) .fetch_one(&mut *self.0) .await?; Ok(message) } async fn assign_sequence(&mut self, channel: &Channel) -> Result { let next = sqlx::query_scalar!( r#" update channel set last_sequence = last_sequence + 1 where id = $1 returning last_sequence as "next_sequence: Sequence" "#, channel.id, ) .fetch_one(&mut *self.0) .await?; Ok(next) } pub async fn delete_expired( &mut self, channel: &Channel, message: &message::Id, deleted_at: &DateTime, ) -> Result { let sequence = self.assign_sequence(channel).await?; sqlx::query_scalar!( r#" delete from message where id = $1 returning 1 as "row: i64" "#, message, ) .fetch_one(&mut *self.0) .await?; Ok(types::ChannelEvent { sequence, at: *deleted_at, channel: channel.clone(), data: types::MessageDeletedEvent { message: message.clone(), } .into(), }) } pub async fn expired( &mut self, expire_at: &DateTime, ) -> Result, sqlx::Error> { let messages = sqlx::query!( r#" select channel.id as "channel_id: channel::Id", channel.name as "channel_name", channel.created_at as "channel_created_at: DateTime", message.id as "message: message::Id" from message join channel on message.channel = channel.id join login as sender on message.sender = sender.id where sent_at < $1 "#, expire_at, ) .map(|row| { ( Channel { id: row.channel_id, name: row.channel_name, created_at: row.channel_created_at, }, row.message, ) }) .fetch_all(&mut *self.0) .await?; Ok(messages) } pub async fn replay( &mut self, channel: &Channel, resume_at: Option, ) -> Result, sqlx::Error> { let events = sqlx::query!( r#" select message.id as "id: message::Id", sequence as "sequence: Sequence", login.id as "sender_id: login::Id", login.name as sender_name, message.body, message.sent_at as "sent_at: DateTime" from message join login on message.sender = login.id where channel = $1 and coalesce(sequence > $2, true) order by sequence asc "#, channel.id, resume_at, ) .map(|row| types::ChannelEvent { sequence: row.sequence, at: row.sent_at, channel: channel.clone(), data: types::MessageEvent { sender: login::Login { id: row.sender_id, name: row.sender_name, }, message: Message { id: row.id, body: row.body, }, } .into(), }) .fetch_all(&mut *self.0) .await?; Ok(events) } }