use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; use crate::{ channel::{Channel, Id}, clock::DateTime, event::{types, Sequence}, }; pub trait Provider { fn channels(&mut self) -> Channels; } impl<'c> Provider for Transaction<'c, Sqlite> { fn channels(&mut self) -> Channels { Channels(self) } } pub struct Channels<'t>(&'t mut SqliteConnection); impl<'c> Channels<'c> { pub async fn create( &mut self, name: &str, created_at: &DateTime, created_sequence: Sequence, ) -> Result { let id = Id::generate(); let channel = sqlx::query_as!( Channel, r#" insert into channel (id, name, created_at, created_sequence) values ($1, $2, $3, $4) returning id as "id: Id", name, created_at as "created_at: DateTime", created_sequence as "created_sequence: Sequence" "#, id, name, created_at, created_sequence, ) .fetch_one(&mut *self.0) .await?; Ok(channel) } pub async fn by_id(&mut self, channel: &Id) -> Result { let channel = sqlx::query_as!( Channel, r#" select id as "id: Id", name, created_at as "created_at: DateTime", created_sequence as "created_sequence: Sequence" from channel where id = $1 "#, channel, ) .fetch_one(&mut *self.0) .await?; Ok(channel) } pub async fn all( &mut self, resume_point: Option, ) -> Result, sqlx::Error> { let channels = sqlx::query_as!( Channel, r#" select id as "id: Id", name, created_at as "created_at: DateTime", created_sequence as "created_sequence: Sequence" from channel where coalesce(created_sequence <= $1, true) order by channel.name "#, resume_point, ) .fetch_all(&mut *self.0) .await?; Ok(channels) } pub async fn replay( &mut self, resume_at: Option, ) -> Result, sqlx::Error> { let channels = sqlx::query_as!( Channel, r#" select id as "id: Id", name, created_at as "created_at: DateTime", created_sequence as "created_sequence: Sequence" from channel where coalesce(created_sequence > $1, true) "#, resume_at, ) .fetch_all(&mut *self.0) .await?; Ok(channels) } pub async fn delete( &mut self, channel: &Channel, deleted_at: &DateTime, deleted_sequence: Sequence, ) -> Result { let channel = channel.id.clone(); sqlx::query_scalar!( r#" delete from channel where id = $1 returning 1 as "row: i64" "#, channel, ) .fetch_one(&mut *self.0) .await?; Ok(types::ChannelEvent { sequence: deleted_sequence, at: *deleted_at, data: types::DeletedEvent { channel }.into(), }) } pub async fn expired(&mut self, expired_at: &DateTime) -> Result, sqlx::Error> { let channels = sqlx::query_as!( Channel, r#" select channel.id as "id: Id", channel.name, channel.created_at as "created_at: DateTime", channel.created_sequence as "created_sequence: Sequence" from channel left join message where created_at < $1 and message.id is null "#, expired_at, ) .fetch_all(&mut *self.0) .await?; Ok(channels) } }