use std::fmt; use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; use super::sequence::Sequence; use crate::{ clock::DateTime, events::types::{self}, id::Id as BaseId, }; pub trait Provider { fn channels(&mut self) -> Channels; } impl<'c> Provider for Transaction<'c, Sqlite> { fn channels(&mut self) -> Channels { Channels(self) } } pub struct Channels<'t>(&'t mut SqliteConnection); #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] pub struct Channel { pub id: Id, pub name: String, #[serde(skip)] pub created_at: DateTime, #[serde(skip)] pub created_sequence: Sequence, } impl<'c> Channels<'c> { pub async fn create( &mut self, name: &str, created_at: &DateTime, created_sequence: Sequence, ) -> Result { let id = Id::generate(); let channel = sqlx::query_as!( Channel, r#" insert into channel (id, name, created_at, created_sequence) values ($1, $2, $3, $4) returning id as "id: Id", name, created_at as "created_at: DateTime", created_sequence as "created_sequence: Sequence" "#, id, name, created_at, created_sequence, ) .fetch_one(&mut *self.0) .await?; Ok(channel) } pub async fn by_id(&mut self, channel: &Id) -> Result { let channel = sqlx::query_as!( Channel, r#" select id as "id: Id", name, created_at as "created_at: DateTime", created_sequence as "created_sequence: Sequence" from channel where id = $1 "#, channel, ) .fetch_one(&mut *self.0) .await?; Ok(channel) } pub async fn all( &mut self, resume_point: Option, ) -> Result, sqlx::Error> { let channels = sqlx::query_as!( Channel, r#" select id as "id: Id", name, created_at as "created_at: DateTime", created_sequence as "created_sequence: Sequence" from channel where coalesce(created_sequence <= $1, true) order by channel.name "#, resume_point, ) .fetch_all(&mut *self.0) .await?; Ok(channels) } pub async fn replay( &mut self, resume_at: Option, ) -> Result, sqlx::Error> { let channels = sqlx::query_as!( Channel, r#" select id as "id: Id", name, created_at as "created_at: DateTime", created_sequence as "created_sequence: Sequence" from channel where coalesce(created_sequence > $1, true) "#, resume_at, ) .fetch_all(&mut *self.0) .await?; Ok(channels) } pub async fn delete( &mut self, channel: &Channel, deleted_at: &DateTime, deleted_sequence: Sequence, ) -> Result { let channel = channel.id.clone(); sqlx::query_scalar!( r#" delete from channel where id = $1 returning 1 as "row: i64" "#, channel, ) .fetch_one(&mut *self.0) .await?; Ok(types::ChannelEvent { sequence: deleted_sequence, at: *deleted_at, data: types::DeletedEvent { channel }.into(), }) } pub async fn expired(&mut self, expired_at: &DateTime) -> Result, sqlx::Error> { let channels = sqlx::query_as!( Channel, r#" select channel.id as "id: Id", channel.name, channel.created_at as "created_at: DateTime", channel.created_sequence as "created_sequence: Sequence" from channel left join message where created_at < $1 and message.id is null "#, expired_at, ) .fetch_all(&mut *self.0) .await?; Ok(channels) } } // Stable identifier for a [Channel]. Prefixed with `C`. #[derive( Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd, sqlx::Type, serde::Deserialize, serde::Serialize, )] #[sqlx(transparent)] #[serde(transparent)] pub struct Id(BaseId); impl From for Id { fn from(id: BaseId) -> Self { Self(id) } } impl Id { pub fn generate() -> Self { BaseId::generate("C") } } impl fmt::Display for Id { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { self.0.fmt(f) } }