use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; use crate::{ channel::{self, Channel}, clock::DateTime, event::{types, Sequence}, login::{self, Login}, message::{self, Message}, }; pub trait Provider { fn message_events(&mut self) -> Events; } impl<'c> Provider for Transaction<'c, Sqlite> { fn message_events(&mut self) -> Events { Events(self) } } pub struct Events<'t>(&'t mut SqliteConnection); impl<'c> Events<'c> { pub async fn create( &mut self, sender: &Login, channel: &Channel, sent_at: &DateTime, sent_sequence: Sequence, body: &str, ) -> Result { let id = message::Id::generate(); let message = sqlx::query!( r#" insert into message (id, channel, sender, sent_at, sent_sequence, body) values ($1, $2, $3, $4, $5, $6) returning id as "id: message::Id", sender as "sender: login::Id", sent_at as "sent_at: DateTime", sent_sequence as "sent_sequence: Sequence", body "#, id, channel.id, sender.id, sent_at, sent_sequence, body, ) .map(|row| types::ChannelEvent { sequence: row.sent_sequence, at: row.sent_at, data: types::MessageEvent { channel: channel.clone(), sender: sender.clone(), message: Message { id: row.id, body: row.body, }, } .into(), }) .fetch_one(&mut *self.0) .await?; Ok(message) } pub async fn delete( &mut self, channel: &Channel, message: &message::Id, deleted_at: &DateTime, deleted_sequence: Sequence, ) -> Result { sqlx::query_scalar!( r#" delete from message where id = $1 returning 1 as "row: i64" "#, message, ) .fetch_one(&mut *self.0) .await?; Ok(types::ChannelEvent { sequence: deleted_sequence, at: *deleted_at, data: types::MessageDeletedEvent { channel: channel.clone(), message: message.clone(), } .into(), }) } pub async fn expired( &mut self, expire_at: &DateTime, ) -> Result, sqlx::Error> { let messages = sqlx::query!( r#" select channel.id as "channel_id: channel::Id", channel.name as "channel_name", channel.created_at as "channel_created_at: DateTime", channel.created_sequence as "channel_created_sequence: Sequence", message.id as "message: message::Id" from message join channel on message.channel = channel.id join login as sender on message.sender = sender.id where sent_at < $1 "#, expire_at, ) .map(|row| { ( Channel { id: row.channel_id, name: row.channel_name, created_at: row.channel_created_at, created_sequence: row.channel_created_sequence, }, row.message, ) }) .fetch_all(&mut *self.0) .await?; Ok(messages) } pub async fn replay( &mut self, resume_at: Option, ) -> Result, sqlx::Error> { let events = sqlx::query!( r#" select message.id as "id: message::Id", channel.id as "channel_id: channel::Id", channel.name as "channel_name", channel.created_at as "channel_created_at: DateTime", channel.created_sequence as "channel_created_sequence: Sequence", sender.id as "sender_id: login::Id", sender.name as sender_name, message.sent_at as "sent_at: DateTime", message.sent_sequence as "sent_sequence: Sequence", message.body from message join channel on message.channel = channel.id join login as sender on message.sender = sender.id where coalesce(message.sent_sequence > $1, true) order by sent_sequence asc "#, resume_at, ) .map(|row| types::ChannelEvent { sequence: row.sent_sequence, at: row.sent_at, data: types::MessageEvent { channel: Channel { id: row.channel_id, name: row.channel_name, created_at: row.channel_created_at, created_sequence: row.channel_created_sequence, }, sender: Login { id: row.sender_id, name: row.sender_name, }, message: Message { id: row.id, body: row.body, }, } .into(), }) .fetch_all(&mut *self.0) .await?; Ok(events) } }