diff options
| author | Owen Jacobson <owen@grimoire.ca> | 2024-10-22 19:12:34 -0400 |
|---|---|---|
| committer | Owen Jacobson <owen@grimoire.ca> | 2024-10-22 19:12:34 -0400 |
| commit | 6430854352745f45281021c305b4e350bc92d535 (patch) | |
| tree | c6901c22a45e36415f63efe988d4d4f2a309df81 /src/channel | |
| parent | 98af8ff80da919a1126ba7c6afa65e6654b5ecde (diff) | |
| parent | db940bacd096a33a65f29759e70ea1acf6186a67 (diff) | |
Merge branch 'unicode-normalization'
Diffstat (limited to 'src/channel')
| -rw-r--r-- | src/channel/app.rs | 63 | ||||
| -rw-r--r-- | src/channel/repo.rs | 244 | ||||
| -rw-r--r-- | src/channel/routes/channel/post.rs | 4 | ||||
| -rw-r--r-- | src/channel/routes/post.rs | 3 | ||||
| -rw-r--r-- | src/channel/snapshot.rs | 4 |
5 files changed, 229 insertions, 89 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs index 75c662d..7bfa0f7 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -2,12 +2,16 @@ use chrono::TimeDelta; use itertools::Itertools; use sqlx::sqlite::SqlitePool; -use super::{repo::Provider as _, Channel, History, Id}; +use super::{ + repo::{LoadError, Provider as _}, + Channel, History, Id, +}; use crate::{ clock::DateTime, db::{Duplicate as _, NotFound as _}, event::{repo::Provider as _, Broadcaster, Event, Sequence}, message::repo::Provider as _, + name::{self, Name}, }; pub struct Channels<'a> { @@ -20,14 +24,14 @@ impl<'a> Channels<'a> { Self { db, events } } - pub async fn create(&self, name: &str, created_at: &DateTime) -> Result<Channel, CreateError> { + pub async fn create(&self, name: &Name, created_at: &DateTime) -> Result<Channel, CreateError> { let mut tx = self.db.begin().await?; let created = tx.sequence().next(created_at).await?; let channel = tx .channels() .create(name, &created) .await - .duplicate(|| CreateError::DuplicateName(name.into()))?; + .duplicate(|| CreateError::DuplicateName(name.clone()))?; tx.commit().await?; self.events @@ -38,7 +42,7 @@ impl<'a> Channels<'a> { // This function is careless with respect to time, and gets you the channel as // it exists in the specific moment when you call it. - pub async fn get(&self, channel: &Id) -> Result<Option<Channel>, sqlx::Error> { + pub async fn get(&self, channel: &Id) -> Result<Option<Channel>, Error> { let mut tx = self.db.begin().await?; let channel = tx.channels().by_id(channel).await.optional()?; tx.commit().await?; @@ -88,7 +92,7 @@ impl<'a> Channels<'a> { Ok(()) } - pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { + pub async fn expire(&self, relative_to: &DateTime) -> Result<(), ExpireError> { // Somewhat arbitrarily, expire after 90 days. let expire_at = relative_to.to_owned() - TimeDelta::days(90); @@ -129,14 +133,33 @@ impl<'a> Channels<'a> { Ok(()) } + + pub async fn recanonicalize(&self) -> Result<(), sqlx::Error> { + let mut tx = self.db.begin().await?; + tx.channels().recanonicalize().await?; + tx.commit().await?; + + Ok(()) + } } #[derive(Debug, thiserror::Error)] pub enum CreateError { #[error("channel named {0} already exists")] - DuplicateName(String), + DuplicateName(Name), #[error(transparent)] Database(#[from] sqlx::Error), + #[error(transparent)] + Name(#[from] name::Error), +} + +impl From<LoadError> for CreateError { + fn from(error: LoadError) -> Self { + match error { + LoadError::Database(error) => error.into(), + LoadError::Name(error) => error.into(), + } + } } #[derive(Debug, thiserror::Error)] @@ -147,4 +170,32 @@ pub enum Error { Deleted(Id), #[error(transparent)] Database(#[from] sqlx::Error), + #[error(transparent)] + Name(#[from] name::Error), +} + +impl From<LoadError> for Error { + fn from(error: LoadError) -> Self { + match error { + LoadError::Database(error) => error.into(), + LoadError::Name(error) => error.into(), + } + } +} + +#[derive(Debug, thiserror::Error)] +pub enum ExpireError { + #[error(transparent)] + Database(#[from] sqlx::Error), + #[error(transparent)] + Name(#[from] name::Error), +} + +impl From<LoadError> for ExpireError { + fn from(error: LoadError) -> Self { + match error { + LoadError::Database(error) => error.into(), + LoadError::Name(error) => error.into(), + } + } } diff --git a/src/channel/repo.rs b/src/channel/repo.rs index 27d35f0..e26ac2b 100644 --- a/src/channel/repo.rs +++ b/src/channel/repo.rs @@ -1,9 +1,12 @@ +use futures::stream::{StreamExt as _, TryStreamExt as _}; use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; use crate::{ channel::{Channel, History, Id}, clock::DateTime, + db::NotFound, event::{Instant, ResumePoint, Sequence}, + name::{self, Name}, }; pub trait Provider { @@ -19,134 +22,162 @@ impl<'c> Provider for Transaction<'c, Sqlite> { pub struct Channels<'t>(&'t mut SqliteConnection); impl<'c> Channels<'c> { - pub async fn create(&mut self, name: &str, created: &Instant) -> Result<History, sqlx::Error> { + pub async fn create(&mut self, name: &Name, created: &Instant) -> Result<History, sqlx::Error> { let id = Id::generate(); - let channel = sqlx::query!( + let name = name.clone(); + let display_name = name.display(); + let canonical_name = name.canonical(); + let created = *created; + + sqlx::query!( r#" insert - into channel (id, name, created_at, created_sequence) - values ($1, $2, $3, $4) - returning - id as "id: Id", - name as "name!", -- known non-null as we just set it - created_at as "created_at: DateTime", - created_sequence as "created_sequence: Sequence" + into channel (id, created_at, created_sequence) + values ($1, $2, $3) "#, id, - name, created.at, created.sequence, ) - .map(|row| History { + .execute(&mut *self.0) + .await?; + + sqlx::query!( + r#" + insert into channel_name (id, display_name, canonical_name) + values ($1, $2, $3) + "#, + id, + display_name, + canonical_name, + ) + .execute(&mut *self.0) + .await?; + + let channel = History { channel: Channel { - id: row.id, - name: row.name, + id, + name: name.clone(), deleted_at: None, }, - created: Instant::new(row.created_at, row.created_sequence), + created, deleted: None, - }) - .fetch_one(&mut *self.0) - .await?; + }; Ok(channel) } - pub async fn by_id(&mut self, channel: &Id) -> Result<History, sqlx::Error> { + pub async fn by_id(&mut self, channel: &Id) -> Result<History, LoadError> { let channel = sqlx::query!( r#" select id as "id: Id", - channel.name, + name.display_name as "display_name?: String", + name.canonical_name as "canonical_name?: String", channel.created_at as "created_at: DateTime", channel.created_sequence as "created_sequence: Sequence", deleted.deleted_at as "deleted_at?: DateTime", deleted.deleted_sequence as "deleted_sequence?: Sequence" from channel + left join channel_name as name + using (id) left join channel_deleted as deleted using (id) where id = $1 "#, channel, ) - .map(|row| History { - channel: Channel { - id: row.id, - name: row.name.unwrap_or_default(), - deleted_at: row.deleted_at, - }, - created: Instant::new(row.created_at, row.created_sequence), - deleted: Instant::optional(row.deleted_at, row.deleted_sequence), + .map(|row| { + Ok::<_, name::Error>(History { + channel: Channel { + id: row.id, + name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(), + deleted_at: row.deleted_at, + }, + created: Instant::new(row.created_at, row.created_sequence), + deleted: Instant::optional(row.deleted_at, row.deleted_sequence), + }) }) .fetch_one(&mut *self.0) - .await?; + .await??; Ok(channel) } - pub async fn all(&mut self, resume_at: ResumePoint) -> Result<Vec<History>, sqlx::Error> { + pub async fn all(&mut self, resume_at: ResumePoint) -> Result<Vec<History>, LoadError> { let channels = sqlx::query!( r#" select id as "id: Id", - channel.name, + name.display_name as "display_name: String", + name.canonical_name as "canonical_name: String", channel.created_at as "created_at: DateTime", channel.created_sequence as "created_sequence: Sequence", - deleted.deleted_at as "deleted_at: DateTime", - deleted.deleted_sequence as "deleted_sequence: Sequence" + deleted.deleted_at as "deleted_at?: DateTime", + deleted.deleted_sequence as "deleted_sequence?: Sequence" from channel + left join channel_name as name + using (id) left join channel_deleted as deleted using (id) where coalesce(channel.created_sequence <= $1, true) - order by channel.name + order by name.canonical_name "#, resume_at, ) - .map(|row| History { - channel: Channel { - id: row.id, - name: row.name.unwrap_or_default(), - deleted_at: row.deleted_at, - }, - created: Instant::new(row.created_at, row.created_sequence), - deleted: Instant::optional(row.deleted_at, row.deleted_sequence), + .map(|row| { + Ok::<_, name::Error>(History { + channel: Channel { + id: row.id, + name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(), + deleted_at: row.deleted_at, + }, + created: Instant::new(row.created_at, row.created_sequence), + deleted: Instant::optional(row.deleted_at, row.deleted_sequence), + }) }) - .fetch_all(&mut *self.0) + .fetch(&mut *self.0) + .map(|res| Ok::<_, LoadError>(res??)) + .try_collect() .await?; Ok(channels) } - pub async fn replay( - &mut self, - resume_at: Option<Sequence>, - ) -> Result<Vec<History>, sqlx::Error> { + pub async fn replay(&mut self, resume_at: Option<Sequence>) -> Result<Vec<History>, LoadError> { let channels = sqlx::query!( r#" select id as "id: Id", - channel.name, + name.display_name as "display_name: String", + name.canonical_name as "canonical_name: String", channel.created_at as "created_at: DateTime", channel.created_sequence as "created_sequence: Sequence", - deleted.deleted_at as "deleted_at: DateTime", - deleted.deleted_sequence as "deleted_sequence: Sequence" + deleted.deleted_at as "deleted_at?: DateTime", + deleted.deleted_sequence as "deleted_sequence?: Sequence" from channel + left join channel_name as name + using (id) left join channel_deleted as deleted using (id) where coalesce(channel.created_sequence > $1, true) "#, resume_at, ) - .map(|row| History { - channel: Channel { - id: row.id, - name: row.name.unwrap_or_default(), - deleted_at: row.deleted_at, - }, - created: Instant::new(row.created_at, row.created_sequence), - deleted: Instant::optional(row.deleted_at, row.deleted_sequence), + .map(|row| { + Ok::<_, name::Error>(History { + channel: Channel { + id: row.id, + name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(), + deleted_at: row.deleted_at, + }, + created: Instant::new(row.created_at, row.created_sequence), + deleted: Instant::optional(row.deleted_at, row.deleted_sequence), + }) }) - .fetch_all(&mut *self.0) + .fetch(&mut *self.0) + .map(|res| Ok::<_, LoadError>(res??)) + .try_collect() .await?; Ok(channels) @@ -156,19 +187,18 @@ impl<'c> Channels<'c> { &mut self, channel: &History, deleted: &Instant, - ) -> Result<History, sqlx::Error> { + ) -> Result<History, LoadError> { let id = channel.id(); - sqlx::query_scalar!( + sqlx::query!( r#" insert into channel_deleted (id, deleted_at, deleted_sequence) values ($1, $2, $3) - returning 1 as "deleted: bool" "#, id, deleted.at, deleted.sequence, ) - .fetch_one(&mut *self.0) + .execute(&mut *self.0) .await?; // Small social responsibility hack here: when a channel is deleted, its name is @@ -179,16 +209,14 @@ impl<'c> Channels<'c> { // This also avoids the need for a separate name reservation table to ensure // that live channels have unique names, since the `channel` table's name field // is unique over non-null values. - sqlx::query_scalar!( + sqlx::query!( r#" - update channel - set name = null + delete from channel_name where id = $1 - returning 1 as "updated: bool" "#, id, ) - .fetch_one(&mut *self.0) + .execute(&mut *self.0) .await?; let channel = self.by_id(id).await?; @@ -230,38 +258,98 @@ impl<'c> Channels<'c> { Ok(()) } - pub async fn expired(&mut self, expired_at: &DateTime) -> Result<Vec<History>, sqlx::Error> { + pub async fn expired(&mut self, expired_at: &DateTime) -> Result<Vec<History>, LoadError> { let channels = sqlx::query!( r#" select channel.id as "id: Id", - channel.name, + name.display_name as "display_name: String", + name.canonical_name as "canonical_name: String", channel.created_at as "created_at: DateTime", channel.created_sequence as "created_sequence: Sequence", deleted.deleted_at as "deleted_at?: DateTime", deleted.deleted_sequence as "deleted_sequence?: Sequence" from channel + left join channel_name as name + using (id) left join channel_deleted as deleted using (id) left join message + on channel.id = message.channel where channel.created_at < $1 and message.id is null and deleted.id is null "#, expired_at, ) - .map(|row| History { - channel: Channel { - id: row.id, - name: row.name.unwrap_or_default(), - deleted_at: row.deleted_at, - }, - created: Instant::new(row.created_at, row.created_sequence), - deleted: Instant::optional(row.deleted_at, row.deleted_sequence), + .map(|row| { + Ok::<_, name::Error>(History { + channel: Channel { + id: row.id, + name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(), + deleted_at: row.deleted_at, + }, + created: Instant::new(row.created_at, row.created_sequence), + deleted: Instant::optional(row.deleted_at, row.deleted_sequence), + }) }) - .fetch_all(&mut *self.0) + .fetch(&mut *self.0) + .map(|res| Ok::<_, LoadError>(res??)) + .try_collect() .await?; Ok(channels) } + + pub async fn recanonicalize(&mut self) -> Result<(), sqlx::Error> { + let channels = sqlx::query!( + r#" + select + id as "id: Id", + display_name as "display_name: String" + from channel_name + "#, + ) + .fetch_all(&mut *self.0) + .await?; + + for channel in channels { + let name = Name::from(channel.display_name); + let canonical_name = name.canonical(); + + sqlx::query!( + r#" + update channel_name + set canonical_name = $1 + where id = $2 + "#, + canonical_name, + channel.id, + ) + .execute(&mut *self.0) + .await?; + } + + Ok(()) + } +} + +#[derive(Debug, thiserror::Error)] +#[error(transparent)] +pub enum LoadError { + Database(#[from] sqlx::Error), + Name(#[from] name::Error), +} + +impl<T> NotFound for Result<T, LoadError> { + type Ok = T; + type Error = LoadError; + + fn optional(self) -> Result<Option<T>, LoadError> { + match self { + Ok(value) => Ok(Some(value)), + Err(LoadError::Database(sqlx::Error::RowNotFound)) => Ok(None), + Err(other) => Err(other), + } + } } diff --git a/src/channel/routes/channel/post.rs b/src/channel/routes/channel/post.rs index b489a77..d0cae05 100644 --- a/src/channel/routes/channel/post.rs +++ b/src/channel/routes/channel/post.rs @@ -9,7 +9,7 @@ use crate::{ clock::RequestedAt, error::{Internal, NotFound}, login::Login, - message::{app::SendError, Message}, + message::{app::SendError, Body, Message}, }; pub async fn handler( @@ -29,7 +29,7 @@ pub async fn handler( #[derive(serde::Deserialize)] pub struct Request { - pub body: String, + pub body: Body, } #[derive(Debug)] diff --git a/src/channel/routes/post.rs b/src/channel/routes/post.rs index a05c312..9781dd7 100644 --- a/src/channel/routes/post.rs +++ b/src/channel/routes/post.rs @@ -10,6 +10,7 @@ use crate::{ clock::RequestedAt, error::Internal, login::Login, + name::Name, }; pub async fn handler( @@ -29,7 +30,7 @@ pub async fn handler( #[derive(serde::Deserialize)] pub struct Request { - pub name: String, + pub name: Name, } #[derive(Debug)] diff --git a/src/channel/snapshot.rs b/src/channel/snapshot.rs index 2b7d89a..129c0d6 100644 --- a/src/channel/snapshot.rs +++ b/src/channel/snapshot.rs @@ -2,12 +2,12 @@ use super::{ event::{Created, Event}, Id, }; -use crate::clock::DateTime; +use crate::{clock::DateTime, name::Name}; #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] pub struct Channel { pub id: Id, - pub name: String, + pub name: Name, #[serde(skip_serializing_if = "Option::is_none")] pub deleted_at: Option<DateTime>, } |
