summaryrefslogtreecommitdiff
path: root/src/channel/repo.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/channel/repo.rs')
-rw-r--r--src/channel/repo.rs336
1 files changed, 0 insertions, 336 deletions
diff --git a/src/channel/repo.rs b/src/channel/repo.rs
deleted file mode 100644
index fd2173a..0000000
--- a/src/channel/repo.rs
+++ /dev/null
@@ -1,336 +0,0 @@
-use futures::stream::{StreamExt as _, TryStreamExt as _};
-use sqlx::{SqliteConnection, Transaction, sqlite::Sqlite};
-
-use crate::{
- channel::{Channel, History, Id},
- clock::DateTime,
- db::NotFound,
- event::{Instant, Sequence},
- name::{self, Name},
-};
-
-pub trait Provider {
- fn channels(&mut self) -> Channels;
-}
-
-impl Provider for Transaction<'_, Sqlite> {
- fn channels(&mut self) -> Channels {
- Channels(self)
- }
-}
-
-pub struct Channels<'t>(&'t mut SqliteConnection);
-
-impl Channels<'_> {
- pub async fn create(&mut self, name: &Name, created: &Instant) -> Result<History, sqlx::Error> {
- let id = Id::generate();
- let name = name.clone();
- let display_name = name.display();
- let canonical_name = name.canonical();
- let created = *created;
-
- sqlx::query!(
- r#"
- insert into conversation (id, created_at, created_sequence, last_sequence)
- values ($1, $2, $3, $4)
- "#,
- id,
- created.at,
- created.sequence,
- created.sequence,
- )
- .execute(&mut *self.0)
- .await?;
-
- sqlx::query!(
- r#"
- insert into conversation_name (id, display_name, canonical_name)
- values ($1, $2, $3)
- "#,
- id,
- display_name,
- canonical_name,
- )
- .execute(&mut *self.0)
- .await?;
-
- let channel = History {
- channel: Channel {
- created,
- id,
- name: name.clone(),
- deleted_at: None,
- },
- deleted: None,
- };
-
- Ok(channel)
- }
-
- pub async fn by_id(&mut self, channel: &Id) -> Result<History, LoadError> {
- let channel = sqlx::query!(
- r#"
- select
- id as "id: Id",
- name.display_name as "display_name?: String",
- name.canonical_name as "canonical_name?: String",
- conversation.created_at as "created_at: DateTime",
- conversation.created_sequence as "created_sequence: Sequence",
- deleted.deleted_at as "deleted_at?: DateTime",
- deleted.deleted_sequence as "deleted_sequence?: Sequence"
- from conversation
- left join conversation_name as name
- using (id)
- left join conversation_deleted as deleted
- using (id)
- where id = $1
- "#,
- channel,
- )
- .map(|row| {
- Ok::<_, name::Error>(History {
- channel: Channel {
- created: Instant::new(row.created_at, row.created_sequence),
- id: row.id,
- name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(),
- deleted_at: row.deleted_at,
- },
- deleted: Instant::optional(row.deleted_at, row.deleted_sequence),
- })
- })
- .fetch_one(&mut *self.0)
- .await??;
-
- Ok(channel)
- }
-
- pub async fn all(&mut self, resume_at: Sequence) -> Result<Vec<History>, LoadError> {
- let channels = sqlx::query!(
- r#"
- select
- id as "id: Id",
- name.display_name as "display_name?: String",
- name.canonical_name as "canonical_name?: String",
- conversation.created_at as "created_at: DateTime",
- conversation.created_sequence as "created_sequence: Sequence",
- deleted.deleted_at as "deleted_at?: DateTime",
- deleted.deleted_sequence as "deleted_sequence?: Sequence"
- from conversation
- left join conversation_name as name
- using (id)
- left join conversation_deleted as deleted
- using (id)
- where conversation.created_sequence <= $1
- order by name.canonical_name
- "#,
- resume_at,
- )
- .map(|row| {
- Ok::<_, name::Error>(History {
- channel: Channel {
- created: Instant::new(row.created_at, row.created_sequence),
- id: row.id,
- name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(),
- deleted_at: row.deleted_at,
- },
- deleted: Instant::optional(row.deleted_at, row.deleted_sequence),
- })
- })
- .fetch(&mut *self.0)
- .map(|res| Ok::<_, LoadError>(res??))
- .try_collect()
- .await?;
-
- Ok(channels)
- }
-
- pub async fn replay(&mut self, resume_at: Sequence) -> Result<Vec<History>, LoadError> {
- let channels = sqlx::query!(
- r#"
- select
- id as "id: Id",
- name.display_name as "display_name?: String",
- name.canonical_name as "canonical_name?: String",
- conversation.created_at as "created_at: DateTime",
- conversation.created_sequence as "created_sequence: Sequence",
- deleted.deleted_at as "deleted_at?: DateTime",
- deleted.deleted_sequence as "deleted_sequence?: Sequence"
- from conversation
- left join conversation_name as name
- using (id)
- left join conversation_deleted as deleted
- using (id)
- where conversation.last_sequence > $1
- "#,
- resume_at,
- )
- .map(|row| {
- Ok::<_, name::Error>(History {
- channel: Channel {
- created: Instant::new(row.created_at, row.created_sequence),
- id: row.id,
- name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(),
- deleted_at: row.deleted_at,
- },
- deleted: Instant::optional(row.deleted_at, row.deleted_sequence),
- })
- })
- .fetch(&mut *self.0)
- .map(|res| Ok::<_, LoadError>(res??))
- .try_collect()
- .await?;
-
- Ok(channels)
- }
-
- pub async fn delete(
- &mut self,
- channel: &History,
- deleted: &Instant,
- ) -> Result<History, LoadError> {
- let id = channel.id();
- sqlx::query!(
- r#"
- update conversation
- set last_sequence = max(last_sequence, $1)
- where id = $2
- returning id as "id: Id"
- "#,
- deleted.sequence,
- id,
- )
- .fetch_one(&mut *self.0)
- .await?;
-
- sqlx::query!(
- r#"
- insert into conversation_deleted (id, deleted_at, deleted_sequence)
- values ($1, $2, $3)
- "#,
- id,
- deleted.at,
- deleted.sequence,
- )
- .execute(&mut *self.0)
- .await?;
-
- // Small social responsibility hack here: when a conversation is deleted, its
- // name is retconned to have been the empty string. Someone reading the event
- // stream afterwards, or looking at channels via the API, cannot retrieve the
- // "deleted" channel's information by ignoring the deletion event.
- //
- // This also avoids the need for a separate name reservation table to ensure
- // that live channels have unique names, since the `channel` table's name field
- // is unique over non-null values.
- sqlx::query!(
- r#"
- delete from conversation_name
- where id = $1
- "#,
- id,
- )
- .execute(&mut *self.0)
- .await?;
-
- let channel = self.by_id(id).await?;
-
- Ok(channel)
- }
-
- pub async fn purge(&mut self, purge_at: &DateTime) -> Result<(), sqlx::Error> {
- let channels = sqlx::query_scalar!(
- r#"
- with has_messages as (
- select conversation
- from message
- group by conversation
- )
- delete from conversation_deleted
- where deleted_at < $1
- and id not in has_messages
- returning id as "id: Id"
- "#,
- purge_at,
- )
- .fetch_all(&mut *self.0)
- .await?;
-
- for channel in channels {
- // Wanted: a way to batch these up into one query.
- sqlx::query!(
- r#"
- delete from conversation
- where id = $1
- "#,
- channel,
- )
- .execute(&mut *self.0)
- .await?;
- }
-
- Ok(())
- }
-
- pub async fn expired(&mut self, expired_at: &DateTime) -> Result<Vec<History>, LoadError> {
- let channels = sqlx::query!(
- r#"
- select
- conversation.id as "id: Id",
- name.display_name as "display_name?: String",
- name.canonical_name as "canonical_name?: String",
- conversation.created_at as "created_at: DateTime",
- conversation.created_sequence as "created_sequence: Sequence",
- deleted.deleted_at as "deleted_at?: DateTime",
- deleted.deleted_sequence as "deleted_sequence?: Sequence"
- from conversation
- left join conversation_name as name
- using (id)
- left join conversation_deleted as deleted
- using (id)
- left join message
- on conversation.id = message.conversation
- where conversation.created_at < $1
- and message.id is null
- and deleted.id is null
- "#,
- expired_at,
- )
- .map(|row| {
- Ok::<_, name::Error>(History {
- channel: Channel {
- created: Instant::new(row.created_at, row.created_sequence),
- id: row.id,
- name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(),
- deleted_at: row.deleted_at,
- },
- deleted: Instant::optional(row.deleted_at, row.deleted_sequence),
- })
- })
- .fetch(&mut *self.0)
- .map(|res| Ok::<_, LoadError>(res??))
- .try_collect()
- .await?;
-
- Ok(channels)
- }
-}
-
-#[derive(Debug, thiserror::Error)]
-#[error(transparent)]
-pub enum LoadError {
- Database(#[from] sqlx::Error),
- Name(#[from] name::Error),
-}
-
-impl<T> NotFound for Result<T, LoadError> {
- type Ok = T;
- type Error = LoadError;
-
- fn optional(self) -> Result<Option<T>, LoadError> {
- match self {
- Ok(value) => Ok(Some(value)),
- Err(LoadError::Database(sqlx::Error::RowNotFound)) => Ok(None),
- Err(other) => Err(other),
- }
- }
-}