use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; use crate::{ channel::{Channel, History, Id}, clock::DateTime, event::{Instant, ResumePoint, Sequence}, }; pub trait Provider { fn channels(&mut self) -> Channels; } impl<'c> Provider for Transaction<'c, Sqlite> { fn channels(&mut self) -> Channels { Channels(self) } } pub struct Channels<'t>(&'t mut SqliteConnection); impl<'c> Channels<'c> { pub async fn create(&mut self, name: &str, created: &Instant) -> Result { let id = Id::generate(); let channel = sqlx::query!( r#" insert into channel (id, name, created_at, created_sequence) values ($1, $2, $3, $4) returning id as "id: Id", name as "name!", -- known non-null as we just set it created_at as "created_at: DateTime", created_sequence as "created_sequence: Sequence" "#, id, name, created.at, created.sequence, ) .map(|row| History { channel: Channel { id: row.id, name: row.name, deleted_at: None, }, created: Instant::new(row.created_at, row.created_sequence), deleted: None, }) .fetch_one(&mut *self.0) .await?; Ok(channel) } pub async fn by_id(&mut self, channel: &Id) -> Result { let channel = sqlx::query!( r#" select id as "id: Id", channel.name, channel.created_at as "created_at: DateTime", channel.created_sequence as "created_sequence: Sequence", deleted.deleted_at as "deleted_at?: DateTime", deleted.deleted_sequence as "deleted_sequence?: Sequence" from channel left join channel_deleted as deleted using (id) where id = $1 "#, channel, ) .map(|row| History { channel: Channel { id: row.id, name: row.name.unwrap_or_default(), deleted_at: row.deleted_at, }, created: Instant::new(row.created_at, row.created_sequence), deleted: Instant::optional(row.deleted_at, row.deleted_sequence), }) .fetch_one(&mut *self.0) .await?; Ok(channel) } pub async fn all(&mut self, resume_at: ResumePoint) -> Result, sqlx::Error> { let channels = sqlx::query!( r#" select id as "id: Id", channel.name, channel.created_at as "created_at: DateTime", channel.created_sequence as "created_sequence: Sequence", deleted.deleted_at as "deleted_at: DateTime", deleted.deleted_sequence as "deleted_sequence: Sequence" from channel left join channel_deleted as deleted using (id) where coalesce(channel.created_sequence <= $1, true) order by channel.name "#, resume_at, ) .map(|row| History { channel: Channel { id: row.id, name: row.name.unwrap_or_default(), deleted_at: row.deleted_at, }, created: Instant::new(row.created_at, row.created_sequence), deleted: Instant::optional(row.deleted_at, row.deleted_sequence), }) .fetch_all(&mut *self.0) .await?; Ok(channels) } pub async fn replay( &mut self, resume_at: Option, ) -> Result, sqlx::Error> { let channels = sqlx::query!( r#" select id as "id: Id", channel.name, channel.created_at as "created_at: DateTime", channel.created_sequence as "created_sequence: Sequence", deleted.deleted_at as "deleted_at: DateTime", deleted.deleted_sequence as "deleted_sequence: Sequence" from channel left join channel_deleted as deleted using (id) where coalesce(channel.created_sequence > $1, true) "#, resume_at, ) .map(|row| History { channel: Channel { id: row.id, name: row.name.unwrap_or_default(), deleted_at: row.deleted_at, }, created: Instant::new(row.created_at, row.created_sequence), deleted: Instant::optional(row.deleted_at, row.deleted_sequence), }) .fetch_all(&mut *self.0) .await?; Ok(channels) } pub async fn delete( &mut self, channel: &History, deleted: &Instant, ) -> Result { let id = channel.id(); sqlx::query_scalar!( r#" insert into channel_deleted (id, deleted_at, deleted_sequence) values ($1, $2, $3) returning 1 as "deleted: bool" "#, id, deleted.at, deleted.sequence, ) .fetch_one(&mut *self.0) .await?; // Small social responsibility hack here: when a channel is deleted, its name is // retconned to have been the empty string. Someone reading the event stream // afterwards, or looking at channels via the API, cannot retrieve the // "deleted" channel's information by ignoring the deletion event. // // This also avoids the need for a separate name reservation table to ensure // that live channels have unique names, since the `channel` table's name field // is unique over non-null values. sqlx::query_scalar!( r#" update channel set name = null where id = $1 returning 1 as "updated: bool" "#, id, ) .fetch_one(&mut *self.0) .await?; let channel = self.by_id(id).await?; Ok(channel) } pub async fn purge(&mut self, purge_at: &DateTime) -> Result<(), sqlx::Error> { let channels = sqlx::query_scalar!( r#" with has_messages as ( select channel from message group by channel ) delete from channel_deleted where deleted_at < $1 and id not in has_messages returning id as "id: Id" "#, purge_at, ) .fetch_all(&mut *self.0) .await?; for channel in channels { // Wanted: a way to batch these up into one query. sqlx::query!( r#" delete from channel where id = $1 "#, channel, ) .execute(&mut *self.0) .await?; } Ok(()) } pub async fn expired(&mut self, expired_at: &DateTime) -> Result, sqlx::Error> { let channels = sqlx::query!( r#" select channel.id as "id: Id", channel.name, channel.created_at as "created_at: DateTime", channel.created_sequence as "created_sequence: Sequence", deleted.deleted_at as "deleted_at?: DateTime", deleted.deleted_sequence as "deleted_sequence?: Sequence" from channel left join channel_deleted as deleted using (id) left join message where channel.created_at < $1 and message.id is null and deleted.id is null "#, expired_at, ) .map(|row| History { channel: Channel { id: row.id, name: row.name.unwrap_or_default(), deleted_at: row.deleted_at, }, created: Instant::new(row.created_at, row.created_sequence), deleted: Instant::optional(row.deleted_at, row.deleted_sequence), }) .fetch_all(&mut *self.0) .await?; Ok(channels) } }