use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; use crate::{ channel::{Channel, History, Id}, clock::DateTime, event::{Instant, ResumePoint, Sequence}, }; pub trait Provider { fn channels(&mut self) -> Channels; } impl<'c> Provider for Transaction<'c, Sqlite> { fn channels(&mut self) -> Channels { Channels(self) } } pub struct Channels<'t>(&'t mut SqliteConnection); impl<'c> Channels<'c> { pub async fn create(&mut self, name: &str, created: &Instant) -> Result { let id = Id::generate(); let channel = sqlx::query!( r#" insert into channel (id, name, created_at, created_sequence) values ($1, $2, $3, $4) returning id as "id: Id", name, created_at as "created_at: DateTime", created_sequence as "created_sequence: Sequence" "#, id, name, created.at, created.sequence, ) .map(|row| History { channel: Channel { id: row.id, name: row.name, }, created: Instant { at: row.created_at, sequence: row.created_sequence, }, deleted: None, }) .fetch_one(&mut *self.0) .await?; Ok(channel) } pub async fn by_id(&mut self, channel: &Id) -> Result { let channel = sqlx::query!( r#" select id as "id: Id", name, created_at as "created_at: DateTime", created_sequence as "created_sequence: Sequence" from channel where id = $1 "#, channel, ) .map(|row| History { channel: Channel { id: row.id, name: row.name, }, created: Instant { at: row.created_at, sequence: row.created_sequence, }, deleted: None, }) .fetch_one(&mut *self.0) .await?; Ok(channel) } pub async fn all(&mut self, resume_at: ResumePoint) -> Result, sqlx::Error> { let channels = sqlx::query!( r#" select id as "id: Id", name, created_at as "created_at: DateTime", created_sequence as "created_sequence: Sequence" from channel where coalesce(created_sequence <= $1, true) order by channel.name "#, resume_at, ) .map(|row| History { channel: Channel { id: row.id, name: row.name, }, created: Instant { at: row.created_at, sequence: row.created_sequence, }, deleted: None, }) .fetch_all(&mut *self.0) .await?; Ok(channels) } pub async fn replay( &mut self, resume_at: Option, ) -> Result, sqlx::Error> { let channels = sqlx::query!( r#" select id as "id: Id", name, created_at as "created_at: DateTime", created_sequence as "created_sequence: Sequence" from channel where coalesce(created_sequence > $1, true) "#, resume_at, ) .map(|row| History { channel: Channel { id: row.id, name: row.name, }, created: Instant { at: row.created_at, sequence: row.created_sequence, }, deleted: None, }) .fetch_all(&mut *self.0) .await?; Ok(channels) } pub async fn delete( &mut self, channel: &Id, deleted: &Instant, ) -> Result { let channel = sqlx::query!( r#" delete from channel where id = $1 returning id as "id: Id", name, created_at as "created_at: DateTime", created_sequence as "created_sequence: Sequence" "#, channel, ) .map(|row| History { channel: Channel { id: row.id, name: row.name, }, created: Instant { at: row.created_at, sequence: row.created_sequence, }, deleted: Some(*deleted), }) .fetch_one(&mut *self.0) .await?; Ok(channel) } pub async fn expired(&mut self, expired_at: &DateTime) -> Result, sqlx::Error> { let channels = sqlx::query_scalar!( r#" select channel.id as "id: Id" from channel left join message where created_at < $1 and message.id is null "#, expired_at, ) .fetch_all(&mut *self.0) .await?; Ok(channels) } }