use futures::stream::{StreamExt as _, TryStreamExt as _}; use sqlx::{SqliteConnection, Transaction, sqlite::Sqlite}; use crate::{ channel::{Channel, History, Id}, clock::DateTime, db::NotFound, event::{Instant, Sequence}, name::{self, Name}, }; pub trait Provider { fn channels(&mut self) -> Channels; } impl Provider for Transaction<'_, Sqlite> { fn channels(&mut self) -> Channels { Channels(self) } } pub struct Channels<'t>(&'t mut SqliteConnection); impl Channels<'_> { pub async fn create(&mut self, name: &Name, created: &Instant) -> Result { let id = Id::generate(); let name = name.clone(); let display_name = name.display(); let canonical_name = name.canonical(); let created = *created; sqlx::query!( r#" insert into channel (id, created_at, created_sequence, last_sequence) values ($1, $2, $3, $4) "#, id, created.at, created.sequence, created.sequence, ) .execute(&mut *self.0) .await?; sqlx::query!( r#" insert into channel_name (id, display_name, canonical_name) values ($1, $2, $3) "#, id, display_name, canonical_name, ) .execute(&mut *self.0) .await?; let channel = History { channel: Channel { id, name: name.clone(), deleted_at: None, }, created, deleted: None, }; Ok(channel) } pub async fn by_id(&mut self, channel: &Id) -> Result { let channel = sqlx::query!( r#" select id as "id: Id", name.display_name as "display_name?: String", name.canonical_name as "canonical_name?: String", channel.created_at as "created_at: DateTime", channel.created_sequence as "created_sequence: Sequence", deleted.deleted_at as "deleted_at?: DateTime", deleted.deleted_sequence as "deleted_sequence?: Sequence" from channel left join channel_name as name using (id) left join channel_deleted as deleted using (id) where id = $1 "#, channel, ) .map(|row| { Ok::<_, name::Error>(History { channel: Channel { id: row.id, name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(), deleted_at: row.deleted_at, }, created: Instant::new(row.created_at, row.created_sequence), deleted: Instant::optional(row.deleted_at, row.deleted_sequence), }) }) .fetch_one(&mut *self.0) .await??; Ok(channel) } pub async fn all(&mut self, resume_at: Sequence) -> Result, LoadError> { let channels = sqlx::query!( r#" select id as "id: Id", name.display_name as "display_name?: String", name.canonical_name as "canonical_name?: String", channel.created_at as "created_at: DateTime", channel.created_sequence as "created_sequence: Sequence", deleted.deleted_at as "deleted_at?: DateTime", deleted.deleted_sequence as "deleted_sequence?: Sequence" from channel left join channel_name as name using (id) left join channel_deleted as deleted using (id) where channel.created_sequence <= $1 order by name.canonical_name "#, resume_at, ) .map(|row| { Ok::<_, name::Error>(History { channel: Channel { id: row.id, name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(), deleted_at: row.deleted_at, }, created: Instant::new(row.created_at, row.created_sequence), deleted: Instant::optional(row.deleted_at, row.deleted_sequence), }) }) .fetch(&mut *self.0) .map(|res| Ok::<_, LoadError>(res??)) .try_collect() .await?; Ok(channels) } pub async fn replay(&mut self, resume_at: Sequence) -> Result, LoadError> { let channels = sqlx::query!( r#" select id as "id: Id", name.display_name as "display_name?: String", name.canonical_name as "canonical_name?: String", channel.created_at as "created_at: DateTime", channel.created_sequence as "created_sequence: Sequence", deleted.deleted_at as "deleted_at?: DateTime", deleted.deleted_sequence as "deleted_sequence?: Sequence" from channel left join channel_name as name using (id) left join channel_deleted as deleted using (id) where channel.last_sequence > $1 "#, resume_at, ) .map(|row| { Ok::<_, name::Error>(History { channel: Channel { id: row.id, name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(), deleted_at: row.deleted_at, }, created: Instant::new(row.created_at, row.created_sequence), deleted: Instant::optional(row.deleted_at, row.deleted_sequence), }) }) .fetch(&mut *self.0) .map(|res| Ok::<_, LoadError>(res??)) .try_collect() .await?; Ok(channels) } pub async fn delete( &mut self, channel: &History, deleted: &Instant, ) -> Result { let id = channel.id(); sqlx::query!( r#" update channel set last_sequence = max(last_sequence, $1) where id = $2 returning id as "id: Id" "#, deleted.sequence, id, ) .fetch_one(&mut *self.0) .await?; sqlx::query!( r#" insert into channel_deleted (id, deleted_at, deleted_sequence) values ($1, $2, $3) "#, id, deleted.at, deleted.sequence, ) .execute(&mut *self.0) .await?; // Small social responsibility hack here: when a channel is deleted, its name is // retconned to have been the empty string. Someone reading the event stream // afterwards, or looking at channels via the API, cannot retrieve the // "deleted" channel's information by ignoring the deletion event. // // This also avoids the need for a separate name reservation table to ensure // that live channels have unique names, since the `channel` table's name field // is unique over non-null values. sqlx::query!( r#" delete from channel_name where id = $1 "#, id, ) .execute(&mut *self.0) .await?; let channel = self.by_id(id).await?; Ok(channel) } pub async fn purge(&mut self, purge_at: &DateTime) -> Result<(), sqlx::Error> { let channels = sqlx::query_scalar!( r#" with has_messages as ( select channel from message group by channel ) delete from channel_deleted where deleted_at < $1 and id not in has_messages returning id as "id: Id" "#, purge_at, ) .fetch_all(&mut *self.0) .await?; for channel in channels { // Wanted: a way to batch these up into one query. sqlx::query!( r#" delete from channel where id = $1 "#, channel, ) .execute(&mut *self.0) .await?; } Ok(()) } pub async fn expired(&mut self, expired_at: &DateTime) -> Result, LoadError> { let channels = sqlx::query!( r#" select channel.id as "id: Id", name.display_name as "display_name?: String", name.canonical_name as "canonical_name?: String", channel.created_at as "created_at: DateTime", channel.created_sequence as "created_sequence: Sequence", deleted.deleted_at as "deleted_at?: DateTime", deleted.deleted_sequence as "deleted_sequence?: Sequence" from channel left join channel_name as name using (id) left join channel_deleted as deleted using (id) left join message on channel.id = message.channel where channel.created_at < $1 and message.id is null and deleted.id is null "#, expired_at, ) .map(|row| { Ok::<_, name::Error>(History { channel: Channel { id: row.id, name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(), deleted_at: row.deleted_at, }, created: Instant::new(row.created_at, row.created_sequence), deleted: Instant::optional(row.deleted_at, row.deleted_sequence), }) }) .fetch(&mut *self.0) .map(|res| Ok::<_, LoadError>(res??)) .try_collect() .await?; Ok(channels) } } #[derive(Debug, thiserror::Error)] #[error(transparent)] pub enum LoadError { Database(#[from] sqlx::Error), Name(#[from] name::Error), } impl NotFound for Result { type Ok = T; type Error = LoadError; fn optional(self) -> Result, LoadError> { match self { Ok(value) => Ok(Some(value)), Err(LoadError::Database(sqlx::Error::RowNotFound)) => Ok(None), Err(other) => Err(other), } } }