diff options
Diffstat (limited to 'src/channel/repo.rs')
| -rw-r--r-- | src/channel/repo.rs | 336 |
1 files changed, 0 insertions, 336 deletions
diff --git a/src/channel/repo.rs b/src/channel/repo.rs deleted file mode 100644 index fd2173a..0000000 --- a/src/channel/repo.rs +++ /dev/null @@ -1,336 +0,0 @@ -use futures::stream::{StreamExt as _, TryStreamExt as _}; -use sqlx::{SqliteConnection, Transaction, sqlite::Sqlite}; - -use crate::{ - channel::{Channel, History, Id}, - clock::DateTime, - db::NotFound, - event::{Instant, Sequence}, - name::{self, Name}, -}; - -pub trait Provider { - fn channels(&mut self) -> Channels; -} - -impl Provider for Transaction<'_, Sqlite> { - fn channels(&mut self) -> Channels { - Channels(self) - } -} - -pub struct Channels<'t>(&'t mut SqliteConnection); - -impl Channels<'_> { - pub async fn create(&mut self, name: &Name, created: &Instant) -> Result<History, sqlx::Error> { - let id = Id::generate(); - let name = name.clone(); - let display_name = name.display(); - let canonical_name = name.canonical(); - let created = *created; - - sqlx::query!( - r#" - insert into conversation (id, created_at, created_sequence, last_sequence) - values ($1, $2, $3, $4) - "#, - id, - created.at, - created.sequence, - created.sequence, - ) - .execute(&mut *self.0) - .await?; - - sqlx::query!( - r#" - insert into conversation_name (id, display_name, canonical_name) - values ($1, $2, $3) - "#, - id, - display_name, - canonical_name, - ) - .execute(&mut *self.0) - .await?; - - let channel = History { - channel: Channel { - created, - id, - name: name.clone(), - deleted_at: None, - }, - deleted: None, - }; - - Ok(channel) - } - - pub async fn by_id(&mut self, channel: &Id) -> Result<History, LoadError> { - let channel = sqlx::query!( - r#" - select - id as "id: Id", - name.display_name as "display_name?: String", - name.canonical_name as "canonical_name?: String", - conversation.created_at as "created_at: DateTime", - conversation.created_sequence as "created_sequence: Sequence", - deleted.deleted_at as "deleted_at?: DateTime", - deleted.deleted_sequence as "deleted_sequence?: Sequence" - from conversation - left join conversation_name as name - using (id) - left join conversation_deleted as deleted - using (id) - where id = $1 - "#, - channel, - ) - .map(|row| { - Ok::<_, name::Error>(History { - channel: Channel { - created: Instant::new(row.created_at, row.created_sequence), - id: row.id, - name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(), - deleted_at: row.deleted_at, - }, - deleted: Instant::optional(row.deleted_at, row.deleted_sequence), - }) - }) - .fetch_one(&mut *self.0) - .await??; - - Ok(channel) - } - - pub async fn all(&mut self, resume_at: Sequence) -> Result<Vec<History>, LoadError> { - let channels = sqlx::query!( - r#" - select - id as "id: Id", - name.display_name as "display_name?: String", - name.canonical_name as "canonical_name?: String", - conversation.created_at as "created_at: DateTime", - conversation.created_sequence as "created_sequence: Sequence", - deleted.deleted_at as "deleted_at?: DateTime", - deleted.deleted_sequence as "deleted_sequence?: Sequence" - from conversation - left join conversation_name as name - using (id) - left join conversation_deleted as deleted - using (id) - where conversation.created_sequence <= $1 - order by name.canonical_name - "#, - resume_at, - ) - .map(|row| { - Ok::<_, name::Error>(History { - channel: Channel { - created: Instant::new(row.created_at, row.created_sequence), - id: row.id, - name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(), - deleted_at: row.deleted_at, - }, - deleted: Instant::optional(row.deleted_at, row.deleted_sequence), - }) - }) - .fetch(&mut *self.0) - .map(|res| Ok::<_, LoadError>(res??)) - .try_collect() - .await?; - - Ok(channels) - } - - pub async fn replay(&mut self, resume_at: Sequence) -> Result<Vec<History>, LoadError> { - let channels = sqlx::query!( - r#" - select - id as "id: Id", - name.display_name as "display_name?: String", - name.canonical_name as "canonical_name?: String", - conversation.created_at as "created_at: DateTime", - conversation.created_sequence as "created_sequence: Sequence", - deleted.deleted_at as "deleted_at?: DateTime", - deleted.deleted_sequence as "deleted_sequence?: Sequence" - from conversation - left join conversation_name as name - using (id) - left join conversation_deleted as deleted - using (id) - where conversation.last_sequence > $1 - "#, - resume_at, - ) - .map(|row| { - Ok::<_, name::Error>(History { - channel: Channel { - created: Instant::new(row.created_at, row.created_sequence), - id: row.id, - name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(), - deleted_at: row.deleted_at, - }, - deleted: Instant::optional(row.deleted_at, row.deleted_sequence), - }) - }) - .fetch(&mut *self.0) - .map(|res| Ok::<_, LoadError>(res??)) - .try_collect() - .await?; - - Ok(channels) - } - - pub async fn delete( - &mut self, - channel: &History, - deleted: &Instant, - ) -> Result<History, LoadError> { - let id = channel.id(); - sqlx::query!( - r#" - update conversation - set last_sequence = max(last_sequence, $1) - where id = $2 - returning id as "id: Id" - "#, - deleted.sequence, - id, - ) - .fetch_one(&mut *self.0) - .await?; - - sqlx::query!( - r#" - insert into conversation_deleted (id, deleted_at, deleted_sequence) - values ($1, $2, $3) - "#, - id, - deleted.at, - deleted.sequence, - ) - .execute(&mut *self.0) - .await?; - - // Small social responsibility hack here: when a conversation is deleted, its - // name is retconned to have been the empty string. Someone reading the event - // stream afterwards, or looking at channels via the API, cannot retrieve the - // "deleted" channel's information by ignoring the deletion event. - // - // This also avoids the need for a separate name reservation table to ensure - // that live channels have unique names, since the `channel` table's name field - // is unique over non-null values. - sqlx::query!( - r#" - delete from conversation_name - where id = $1 - "#, - id, - ) - .execute(&mut *self.0) - .await?; - - let channel = self.by_id(id).await?; - - Ok(channel) - } - - pub async fn purge(&mut self, purge_at: &DateTime) -> Result<(), sqlx::Error> { - let channels = sqlx::query_scalar!( - r#" - with has_messages as ( - select conversation - from message - group by conversation - ) - delete from conversation_deleted - where deleted_at < $1 - and id not in has_messages - returning id as "id: Id" - "#, - purge_at, - ) - .fetch_all(&mut *self.0) - .await?; - - for channel in channels { - // Wanted: a way to batch these up into one query. - sqlx::query!( - r#" - delete from conversation - where id = $1 - "#, - channel, - ) - .execute(&mut *self.0) - .await?; - } - - Ok(()) - } - - pub async fn expired(&mut self, expired_at: &DateTime) -> Result<Vec<History>, LoadError> { - let channels = sqlx::query!( - r#" - select - conversation.id as "id: Id", - name.display_name as "display_name?: String", - name.canonical_name as "canonical_name?: String", - conversation.created_at as "created_at: DateTime", - conversation.created_sequence as "created_sequence: Sequence", - deleted.deleted_at as "deleted_at?: DateTime", - deleted.deleted_sequence as "deleted_sequence?: Sequence" - from conversation - left join conversation_name as name - using (id) - left join conversation_deleted as deleted - using (id) - left join message - on conversation.id = message.conversation - where conversation.created_at < $1 - and message.id is null - and deleted.id is null - "#, - expired_at, - ) - .map(|row| { - Ok::<_, name::Error>(History { - channel: Channel { - created: Instant::new(row.created_at, row.created_sequence), - id: row.id, - name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(), - deleted_at: row.deleted_at, - }, - deleted: Instant::optional(row.deleted_at, row.deleted_sequence), - }) - }) - .fetch(&mut *self.0) - .map(|res| Ok::<_, LoadError>(res??)) - .try_collect() - .await?; - - Ok(channels) - } -} - -#[derive(Debug, thiserror::Error)] -#[error(transparent)] -pub enum LoadError { - Database(#[from] sqlx::Error), - Name(#[from] name::Error), -} - -impl<T> NotFound for Result<T, LoadError> { - type Ok = T; - type Error = LoadError; - - fn optional(self) -> Result<Option<T>, LoadError> { - match self { - Ok(value) => Ok(Some(value)), - Err(LoadError::Database(sqlx::Error::RowNotFound)) => Ok(None), - Err(other) => Err(other), - } - } -} |
