summaryrefslogtreecommitdiff
path: root/src/conversation/repo.rs
diff options
context:
space:
mode:
authorojacobson <ojacobson@noreply.codeberg.org>2025-07-04 05:00:21 +0200
committerojacobson <ojacobson@noreply.codeberg.org>2025-07-04 05:00:21 +0200
commitc35be3ae29e77983f013c01260dda20208175f2b (patch)
treeabf0b9d993ef03a53903aae03f375b78473952da /src/conversation/repo.rs
parent981cd3c0f4cf912c1d91ee5d9c39f5c1aa7afecf (diff)
parent9b38cb1a62ede4900fde4ba47a7b065db329e994 (diff)
Rename "channels" to "conversations."
The term "channel" for a conversational container has a long and storied history, but is mostly evocative of IRC and of other, ah, "nerd-centric" services. It does show up in more widespread contexts: Discord and Slack both refer to their primary conversational containers as "channels," for example. However, I think it's unnecessary jargon, and I'd like to do away with it. To that end, this change pervasively changes one term to the other wherever it appears, with the following exceptions: * A `channel` concept (unrelated to conversations) is also provided by an external library; we can't and shouldn't try to rename that. * The code to deal with the `pilcrow:channelData` and `pilcrow:lastActiveChannel` local storage properties is still present, to migrate existing data to new keys. It will be removed in a later change. This is a **breaking API change**. As we are not yet managing any API compatibility promises, this is formally not an issue, but it is something to be aware of practically. The major API changes are: * Paths beginning with `/api/channels` are now under `/api/conversations`, without other modifications. * Fields labelled with `channel…` terms are now labelled with `conversation…` terms. For example, a `message` `sent` event is now sent to a `conversation`, not a `channel`. This is also a **breaking UI change**. Specifically, any saved paths for `/ch/CHANNELID` will now lead to a 404. The corresponding paths are `/c/CONVERSATIONID`. While I've made an effort to migrate the location of stored data, I have not tried to provide adapters to fix this specific issue, because the disruption is short-lived and very easily addressed by opening a channel in the client UI. This change is obnoxiously large and difficult to review, for which I apologize. If this shows up in `git annotate`, please forgive me. These kinds of renamings are hard to carry out without a major disruption, especially when the concept ("channel" in this case) is used so pervasively throughout the system. I think it's worth making this change that pervasively so that we don't have an indefinitely-long tail of "well, it's a conversation in the docs, but the table is called `channel` for historical reasons" type issues. Merges conversations-not-channels into main.
Diffstat (limited to 'src/conversation/repo.rs')
-rw-r--r--src/conversation/repo.rs332
1 files changed, 332 insertions, 0 deletions
diff --git a/src/conversation/repo.rs b/src/conversation/repo.rs
new file mode 100644
index 0000000..82b5f01
--- /dev/null
+++ b/src/conversation/repo.rs
@@ -0,0 +1,332 @@
+use futures::stream::{StreamExt as _, TryStreamExt as _};
+use sqlx::{SqliteConnection, Transaction, sqlite::Sqlite};
+
+use crate::{
+ clock::DateTime,
+ conversation::{Conversation, History, Id},
+ db::NotFound,
+ event::{Instant, Sequence},
+ name::{self, Name},
+};
+
+pub trait Provider {
+ fn conversations(&mut self) -> Conversations;
+}
+
+impl Provider for Transaction<'_, Sqlite> {
+ fn conversations(&mut self) -> Conversations {
+ Conversations(self)
+ }
+}
+
+pub struct Conversations<'t>(&'t mut SqliteConnection);
+
+impl Conversations<'_> {
+ pub async fn create(&mut self, name: &Name, created: &Instant) -> Result<History, sqlx::Error> {
+ let id = Id::generate();
+ let name = name.clone();
+ let display_name = name.display();
+ let canonical_name = name.canonical();
+ let created = *created;
+
+ sqlx::query!(
+ r#"
+ insert into conversation (id, created_at, created_sequence, last_sequence)
+ values ($1, $2, $3, $4)
+ "#,
+ id,
+ created.at,
+ created.sequence,
+ created.sequence,
+ )
+ .execute(&mut *self.0)
+ .await?;
+
+ sqlx::query!(
+ r#"
+ insert into conversation_name (id, display_name, canonical_name)
+ values ($1, $2, $3)
+ "#,
+ id,
+ display_name,
+ canonical_name,
+ )
+ .execute(&mut *self.0)
+ .await?;
+
+ let conversation = History {
+ conversation: Conversation {
+ created,
+ id,
+ name: name.clone(),
+ deleted_at: None,
+ },
+ deleted: None,
+ };
+
+ Ok(conversation)
+ }
+
+ pub async fn by_id(&mut self, conversation: &Id) -> Result<History, LoadError> {
+ let conversation = sqlx::query!(
+ r#"
+ select
+ id as "id: Id",
+ name.display_name as "display_name?: String",
+ name.canonical_name as "canonical_name?: String",
+ conversation.created_at as "created_at: DateTime",
+ conversation.created_sequence as "created_sequence: Sequence",
+ deleted.deleted_at as "deleted_at?: DateTime",
+ deleted.deleted_sequence as "deleted_sequence?: Sequence"
+ from conversation
+ left join conversation_name as name
+ using (id)
+ left join conversation_deleted as deleted
+ using (id)
+ where id = $1
+ "#,
+ conversation,
+ )
+ .map(|row| {
+ Ok::<_, name::Error>(History {
+ conversation: Conversation {
+ created: Instant::new(row.created_at, row.created_sequence),
+ id: row.id,
+ name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(),
+ deleted_at: row.deleted_at,
+ },
+ deleted: Instant::optional(row.deleted_at, row.deleted_sequence),
+ })
+ })
+ .fetch_one(&mut *self.0)
+ .await??;
+
+ Ok(conversation)
+ }
+
+ pub async fn all(&mut self, resume_at: Sequence) -> Result<Vec<History>, LoadError> {
+ let conversations = sqlx::query!(
+ r#"
+ select
+ id as "id: Id",
+ name.display_name as "display_name?: String",
+ name.canonical_name as "canonical_name?: String",
+ conversation.created_at as "created_at: DateTime",
+ conversation.created_sequence as "created_sequence: Sequence",
+ deleted.deleted_at as "deleted_at?: DateTime",
+ deleted.deleted_sequence as "deleted_sequence?: Sequence"
+ from conversation
+ left join conversation_name as name
+ using (id)
+ left join conversation_deleted as deleted
+ using (id)
+ where conversation.created_sequence <= $1
+ order by name.canonical_name
+ "#,
+ resume_at,
+ )
+ .map(|row| {
+ Ok::<_, name::Error>(History {
+ conversation: Conversation {
+ created: Instant::new(row.created_at, row.created_sequence),
+ id: row.id,
+ name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(),
+ deleted_at: row.deleted_at,
+ },
+ deleted: Instant::optional(row.deleted_at, row.deleted_sequence),
+ })
+ })
+ .fetch(&mut *self.0)
+ .map(|res| Ok::<_, LoadError>(res??))
+ .try_collect()
+ .await?;
+
+ Ok(conversations)
+ }
+
+ pub async fn replay(&mut self, resume_at: Sequence) -> Result<Vec<History>, LoadError> {
+ let conversations = sqlx::query!(
+ r#"
+ select
+ id as "id: Id",
+ name.display_name as "display_name?: String",
+ name.canonical_name as "canonical_name?: String",
+ conversation.created_at as "created_at: DateTime",
+ conversation.created_sequence as "created_sequence: Sequence",
+ deleted.deleted_at as "deleted_at?: DateTime",
+ deleted.deleted_sequence as "deleted_sequence?: Sequence"
+ from conversation
+ left join conversation_name as name
+ using (id)
+ left join conversation_deleted as deleted
+ using (id)
+ where conversation.last_sequence > $1
+ "#,
+ resume_at,
+ )
+ .map(|row| {
+ Ok::<_, name::Error>(History {
+ conversation: Conversation {
+ created: Instant::new(row.created_at, row.created_sequence),
+ id: row.id,
+ name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(),
+ deleted_at: row.deleted_at,
+ },
+ deleted: Instant::optional(row.deleted_at, row.deleted_sequence),
+ })
+ })
+ .fetch(&mut *self.0)
+ .map(|res| Ok::<_, LoadError>(res??))
+ .try_collect()
+ .await?;
+
+ Ok(conversations)
+ }
+
+ pub async fn delete(
+ &mut self,
+ conversation: &History,
+ deleted: &Instant,
+ ) -> Result<History, LoadError> {
+ let id = conversation.id();
+ sqlx::query!(
+ r#"
+ update conversation
+ set last_sequence = max(last_sequence, $1)
+ where id = $2
+ returning id as "id: Id"
+ "#,
+ deleted.sequence,
+ id,
+ )
+ .fetch_one(&mut *self.0)
+ .await?;
+
+ sqlx::query!(
+ r#"
+ insert into conversation_deleted (id, deleted_at, deleted_sequence)
+ values ($1, $2, $3)
+ "#,
+ id,
+ deleted.at,
+ deleted.sequence,
+ )
+ .execute(&mut *self.0)
+ .await?;
+
+ // Small social responsibility hack here: when a conversation is deleted, its
+ // name is retconned to have been the empty string. Someone reading the event
+ // stream afterwards, or looking at conversations via the API, cannot retrieve
+ // the "deleted" conversation's information by ignoring the deletion event.
+ sqlx::query!(
+ r#"
+ delete from conversation_name
+ where id = $1
+ "#,
+ id,
+ )
+ .execute(&mut *self.0)
+ .await?;
+
+ let conversation = self.by_id(id).await?;
+
+ Ok(conversation)
+ }
+
+ pub async fn purge(&mut self, purge_at: &DateTime) -> Result<(), sqlx::Error> {
+ let conversations = sqlx::query_scalar!(
+ r#"
+ with has_messages as (
+ select conversation
+ from message
+ group by conversation
+ )
+ delete from conversation_deleted
+ where deleted_at < $1
+ and id not in has_messages
+ returning id as "id: Id"
+ "#,
+ purge_at,
+ )
+ .fetch_all(&mut *self.0)
+ .await?;
+
+ for conversation in conversations {
+ // Wanted: a way to batch these up into one query.
+ sqlx::query!(
+ r#"
+ delete from conversation
+ where id = $1
+ "#,
+ conversation,
+ )
+ .execute(&mut *self.0)
+ .await?;
+ }
+
+ Ok(())
+ }
+
+ pub async fn expired(&mut self, expired_at: &DateTime) -> Result<Vec<History>, LoadError> {
+ let conversations = sqlx::query!(
+ r#"
+ select
+ conversation.id as "id: Id",
+ name.display_name as "display_name?: String",
+ name.canonical_name as "canonical_name?: String",
+ conversation.created_at as "created_at: DateTime",
+ conversation.created_sequence as "created_sequence: Sequence",
+ deleted.deleted_at as "deleted_at?: DateTime",
+ deleted.deleted_sequence as "deleted_sequence?: Sequence"
+ from conversation
+ left join conversation_name as name
+ using (id)
+ left join conversation_deleted as deleted
+ using (id)
+ left join message
+ on conversation.id = message.conversation
+ where conversation.created_at < $1
+ and message.id is null
+ and deleted.id is null
+ "#,
+ expired_at,
+ )
+ .map(|row| {
+ Ok::<_, name::Error>(History {
+ conversation: Conversation {
+ created: Instant::new(row.created_at, row.created_sequence),
+ id: row.id,
+ name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(),
+ deleted_at: row.deleted_at,
+ },
+ deleted: Instant::optional(row.deleted_at, row.deleted_sequence),
+ })
+ })
+ .fetch(&mut *self.0)
+ .map(|res| Ok::<_, LoadError>(res??))
+ .try_collect()
+ .await?;
+
+ Ok(conversations)
+ }
+}
+
+#[derive(Debug, thiserror::Error)]
+#[error(transparent)]
+pub enum LoadError {
+ Database(#[from] sqlx::Error),
+ Name(#[from] name::Error),
+}
+
+impl<T> NotFound for Result<T, LoadError> {
+ type Ok = T;
+ type Error = LoadError;
+
+ fn optional(self) -> Result<Option<T>, LoadError> {
+ match self {
+ Ok(value) => Ok(Some(value)),
+ Err(LoadError::Database(sqlx::Error::RowNotFound)) => Ok(None),
+ Err(other) => Err(other),
+ }
+ }
+}