summaryrefslogtreecommitdiff
path: root/src/conversation/repo.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/conversation/repo.rs')
-rw-r--r--src/conversation/repo.rs332
1 files changed, 332 insertions, 0 deletions
diff --git a/src/conversation/repo.rs b/src/conversation/repo.rs
new file mode 100644
index 0000000..82b5f01
--- /dev/null
+++ b/src/conversation/repo.rs
@@ -0,0 +1,332 @@
+use futures::stream::{StreamExt as _, TryStreamExt as _};
+use sqlx::{SqliteConnection, Transaction, sqlite::Sqlite};
+
+use crate::{
+ clock::DateTime,
+ conversation::{Conversation, History, Id},
+ db::NotFound,
+ event::{Instant, Sequence},
+ name::{self, Name},
+};
+
+pub trait Provider {
+ fn conversations(&mut self) -> Conversations;
+}
+
+impl Provider for Transaction<'_, Sqlite> {
+ fn conversations(&mut self) -> Conversations {
+ Conversations(self)
+ }
+}
+
+pub struct Conversations<'t>(&'t mut SqliteConnection);
+
+impl Conversations<'_> {
+ pub async fn create(&mut self, name: &Name, created: &Instant) -> Result<History, sqlx::Error> {
+ let id = Id::generate();
+ let name = name.clone();
+ let display_name = name.display();
+ let canonical_name = name.canonical();
+ let created = *created;
+
+ sqlx::query!(
+ r#"
+ insert into conversation (id, created_at, created_sequence, last_sequence)
+ values ($1, $2, $3, $4)
+ "#,
+ id,
+ created.at,
+ created.sequence,
+ created.sequence,
+ )
+ .execute(&mut *self.0)
+ .await?;
+
+ sqlx::query!(
+ r#"
+ insert into conversation_name (id, display_name, canonical_name)
+ values ($1, $2, $3)
+ "#,
+ id,
+ display_name,
+ canonical_name,
+ )
+ .execute(&mut *self.0)
+ .await?;
+
+ let conversation = History {
+ conversation: Conversation {
+ created,
+ id,
+ name: name.clone(),
+ deleted_at: None,
+ },
+ deleted: None,
+ };
+
+ Ok(conversation)
+ }
+
+ pub async fn by_id(&mut self, conversation: &Id) -> Result<History, LoadError> {
+ let conversation = sqlx::query!(
+ r#"
+ select
+ id as "id: Id",
+ name.display_name as "display_name?: String",
+ name.canonical_name as "canonical_name?: String",
+ conversation.created_at as "created_at: DateTime",
+ conversation.created_sequence as "created_sequence: Sequence",
+ deleted.deleted_at as "deleted_at?: DateTime",
+ deleted.deleted_sequence as "deleted_sequence?: Sequence"
+ from conversation
+ left join conversation_name as name
+ using (id)
+ left join conversation_deleted as deleted
+ using (id)
+ where id = $1
+ "#,
+ conversation,
+ )
+ .map(|row| {
+ Ok::<_, name::Error>(History {
+ conversation: Conversation {
+ created: Instant::new(row.created_at, row.created_sequence),
+ id: row.id,
+ name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(),
+ deleted_at: row.deleted_at,
+ },
+ deleted: Instant::optional(row.deleted_at, row.deleted_sequence),
+ })
+ })
+ .fetch_one(&mut *self.0)
+ .await??;
+
+ Ok(conversation)
+ }
+
+ pub async fn all(&mut self, resume_at: Sequence) -> Result<Vec<History>, LoadError> {
+ let conversations = sqlx::query!(
+ r#"
+ select
+ id as "id: Id",
+ name.display_name as "display_name?: String",
+ name.canonical_name as "canonical_name?: String",
+ conversation.created_at as "created_at: DateTime",
+ conversation.created_sequence as "created_sequence: Sequence",
+ deleted.deleted_at as "deleted_at?: DateTime",
+ deleted.deleted_sequence as "deleted_sequence?: Sequence"
+ from conversation
+ left join conversation_name as name
+ using (id)
+ left join conversation_deleted as deleted
+ using (id)
+ where conversation.created_sequence <= $1
+ order by name.canonical_name
+ "#,
+ resume_at,
+ )
+ .map(|row| {
+ Ok::<_, name::Error>(History {
+ conversation: Conversation {
+ created: Instant::new(row.created_at, row.created_sequence),
+ id: row.id,
+ name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(),
+ deleted_at: row.deleted_at,
+ },
+ deleted: Instant::optional(row.deleted_at, row.deleted_sequence),
+ })
+ })
+ .fetch(&mut *self.0)
+ .map(|res| Ok::<_, LoadError>(res??))
+ .try_collect()
+ .await?;
+
+ Ok(conversations)
+ }
+
+ pub async fn replay(&mut self, resume_at: Sequence) -> Result<Vec<History>, LoadError> {
+ let conversations = sqlx::query!(
+ r#"
+ select
+ id as "id: Id",
+ name.display_name as "display_name?: String",
+ name.canonical_name as "canonical_name?: String",
+ conversation.created_at as "created_at: DateTime",
+ conversation.created_sequence as "created_sequence: Sequence",
+ deleted.deleted_at as "deleted_at?: DateTime",
+ deleted.deleted_sequence as "deleted_sequence?: Sequence"
+ from conversation
+ left join conversation_name as name
+ using (id)
+ left join conversation_deleted as deleted
+ using (id)
+ where conversation.last_sequence > $1
+ "#,
+ resume_at,
+ )
+ .map(|row| {
+ Ok::<_, name::Error>(History {
+ conversation: Conversation {
+ created: Instant::new(row.created_at, row.created_sequence),
+ id: row.id,
+ name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(),
+ deleted_at: row.deleted_at,
+ },
+ deleted: Instant::optional(row.deleted_at, row.deleted_sequence),
+ })
+ })
+ .fetch(&mut *self.0)
+ .map(|res| Ok::<_, LoadError>(res??))
+ .try_collect()
+ .await?;
+
+ Ok(conversations)
+ }
+
+ pub async fn delete(
+ &mut self,
+ conversation: &History,
+ deleted: &Instant,
+ ) -> Result<History, LoadError> {
+ let id = conversation.id();
+ sqlx::query!(
+ r#"
+ update conversation
+ set last_sequence = max(last_sequence, $1)
+ where id = $2
+ returning id as "id: Id"
+ "#,
+ deleted.sequence,
+ id,
+ )
+ .fetch_one(&mut *self.0)
+ .await?;
+
+ sqlx::query!(
+ r#"
+ insert into conversation_deleted (id, deleted_at, deleted_sequence)
+ values ($1, $2, $3)
+ "#,
+ id,
+ deleted.at,
+ deleted.sequence,
+ )
+ .execute(&mut *self.0)
+ .await?;
+
+ // Small social responsibility hack here: when a conversation is deleted, its
+ // name is retconned to have been the empty string. Someone reading the event
+ // stream afterwards, or looking at conversations via the API, cannot retrieve
+ // the "deleted" conversation's information by ignoring the deletion event.
+ sqlx::query!(
+ r#"
+ delete from conversation_name
+ where id = $1
+ "#,
+ id,
+ )
+ .execute(&mut *self.0)
+ .await?;
+
+ let conversation = self.by_id(id).await?;
+
+ Ok(conversation)
+ }
+
+ pub async fn purge(&mut self, purge_at: &DateTime) -> Result<(), sqlx::Error> {
+ let conversations = sqlx::query_scalar!(
+ r#"
+ with has_messages as (
+ select conversation
+ from message
+ group by conversation
+ )
+ delete from conversation_deleted
+ where deleted_at < $1
+ and id not in has_messages
+ returning id as "id: Id"
+ "#,
+ purge_at,
+ )
+ .fetch_all(&mut *self.0)
+ .await?;
+
+ for conversation in conversations {
+ // Wanted: a way to batch these up into one query.
+ sqlx::query!(
+ r#"
+ delete from conversation
+ where id = $1
+ "#,
+ conversation,
+ )
+ .execute(&mut *self.0)
+ .await?;
+ }
+
+ Ok(())
+ }
+
+ pub async fn expired(&mut self, expired_at: &DateTime) -> Result<Vec<History>, LoadError> {
+ let conversations = sqlx::query!(
+ r#"
+ select
+ conversation.id as "id: Id",
+ name.display_name as "display_name?: String",
+ name.canonical_name as "canonical_name?: String",
+ conversation.created_at as "created_at: DateTime",
+ conversation.created_sequence as "created_sequence: Sequence",
+ deleted.deleted_at as "deleted_at?: DateTime",
+ deleted.deleted_sequence as "deleted_sequence?: Sequence"
+ from conversation
+ left join conversation_name as name
+ using (id)
+ left join conversation_deleted as deleted
+ using (id)
+ left join message
+ on conversation.id = message.conversation
+ where conversation.created_at < $1
+ and message.id is null
+ and deleted.id is null
+ "#,
+ expired_at,
+ )
+ .map(|row| {
+ Ok::<_, name::Error>(History {
+ conversation: Conversation {
+ created: Instant::new(row.created_at, row.created_sequence),
+ id: row.id,
+ name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(),
+ deleted_at: row.deleted_at,
+ },
+ deleted: Instant::optional(row.deleted_at, row.deleted_sequence),
+ })
+ })
+ .fetch(&mut *self.0)
+ .map(|res| Ok::<_, LoadError>(res??))
+ .try_collect()
+ .await?;
+
+ Ok(conversations)
+ }
+}
+
+#[derive(Debug, thiserror::Error)]
+#[error(transparent)]
+pub enum LoadError {
+ Database(#[from] sqlx::Error),
+ Name(#[from] name::Error),
+}
+
+impl<T> NotFound for Result<T, LoadError> {
+ type Ok = T;
+ type Error = LoadError;
+
+ fn optional(self) -> Result<Option<T>, LoadError> {
+ match self {
+ Ok(value) => Ok(Some(value)),
+ Err(LoadError::Database(sqlx::Error::RowNotFound)) => Ok(None),
+ Err(other) => Err(other),
+ }
+ }
+}