summaryrefslogtreecommitdiff
path: root/src/channel
diff options
context:
space:
mode:
Diffstat (limited to 'src/channel')
-rw-r--r--src/channel/app.rs63
-rw-r--r--src/channel/repo.rs244
-rw-r--r--src/channel/routes/channel/post.rs4
-rw-r--r--src/channel/routes/post.rs3
-rw-r--r--src/channel/snapshot.rs4
5 files changed, 229 insertions, 89 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs
index 75c662d..7bfa0f7 100644
--- a/src/channel/app.rs
+++ b/src/channel/app.rs
@@ -2,12 +2,16 @@ use chrono::TimeDelta;
use itertools::Itertools;
use sqlx::sqlite::SqlitePool;
-use super::{repo::Provider as _, Channel, History, Id};
+use super::{
+ repo::{LoadError, Provider as _},
+ Channel, History, Id,
+};
use crate::{
clock::DateTime,
db::{Duplicate as _, NotFound as _},
event::{repo::Provider as _, Broadcaster, Event, Sequence},
message::repo::Provider as _,
+ name::{self, Name},
};
pub struct Channels<'a> {
@@ -20,14 +24,14 @@ impl<'a> Channels<'a> {
Self { db, events }
}
- pub async fn create(&self, name: &str, created_at: &DateTime) -> Result<Channel, CreateError> {
+ pub async fn create(&self, name: &Name, created_at: &DateTime) -> Result<Channel, CreateError> {
let mut tx = self.db.begin().await?;
let created = tx.sequence().next(created_at).await?;
let channel = tx
.channels()
.create(name, &created)
.await
- .duplicate(|| CreateError::DuplicateName(name.into()))?;
+ .duplicate(|| CreateError::DuplicateName(name.clone()))?;
tx.commit().await?;
self.events
@@ -38,7 +42,7 @@ impl<'a> Channels<'a> {
// This function is careless with respect to time, and gets you the channel as
// it exists in the specific moment when you call it.
- pub async fn get(&self, channel: &Id) -> Result<Option<Channel>, sqlx::Error> {
+ pub async fn get(&self, channel: &Id) -> Result<Option<Channel>, Error> {
let mut tx = self.db.begin().await?;
let channel = tx.channels().by_id(channel).await.optional()?;
tx.commit().await?;
@@ -88,7 +92,7 @@ impl<'a> Channels<'a> {
Ok(())
}
- pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> {
+ pub async fn expire(&self, relative_to: &DateTime) -> Result<(), ExpireError> {
// Somewhat arbitrarily, expire after 90 days.
let expire_at = relative_to.to_owned() - TimeDelta::days(90);
@@ -129,14 +133,33 @@ impl<'a> Channels<'a> {
Ok(())
}
+
+ pub async fn recanonicalize(&self) -> Result<(), sqlx::Error> {
+ let mut tx = self.db.begin().await?;
+ tx.channels().recanonicalize().await?;
+ tx.commit().await?;
+
+ Ok(())
+ }
}
#[derive(Debug, thiserror::Error)]
pub enum CreateError {
#[error("channel named {0} already exists")]
- DuplicateName(String),
+ DuplicateName(Name),
#[error(transparent)]
Database(#[from] sqlx::Error),
+ #[error(transparent)]
+ Name(#[from] name::Error),
+}
+
+impl From<LoadError> for CreateError {
+ fn from(error: LoadError) -> Self {
+ match error {
+ LoadError::Database(error) => error.into(),
+ LoadError::Name(error) => error.into(),
+ }
+ }
}
#[derive(Debug, thiserror::Error)]
@@ -147,4 +170,32 @@ pub enum Error {
Deleted(Id),
#[error(transparent)]
Database(#[from] sqlx::Error),
+ #[error(transparent)]
+ Name(#[from] name::Error),
+}
+
+impl From<LoadError> for Error {
+ fn from(error: LoadError) -> Self {
+ match error {
+ LoadError::Database(error) => error.into(),
+ LoadError::Name(error) => error.into(),
+ }
+ }
+}
+
+#[derive(Debug, thiserror::Error)]
+pub enum ExpireError {
+ #[error(transparent)]
+ Database(#[from] sqlx::Error),
+ #[error(transparent)]
+ Name(#[from] name::Error),
+}
+
+impl From<LoadError> for ExpireError {
+ fn from(error: LoadError) -> Self {
+ match error {
+ LoadError::Database(error) => error.into(),
+ LoadError::Name(error) => error.into(),
+ }
+ }
}
diff --git a/src/channel/repo.rs b/src/channel/repo.rs
index 27d35f0..e26ac2b 100644
--- a/src/channel/repo.rs
+++ b/src/channel/repo.rs
@@ -1,9 +1,12 @@
+use futures::stream::{StreamExt as _, TryStreamExt as _};
use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction};
use crate::{
channel::{Channel, History, Id},
clock::DateTime,
+ db::NotFound,
event::{Instant, ResumePoint, Sequence},
+ name::{self, Name},
};
pub trait Provider {
@@ -19,134 +22,162 @@ impl<'c> Provider for Transaction<'c, Sqlite> {
pub struct Channels<'t>(&'t mut SqliteConnection);
impl<'c> Channels<'c> {
- pub async fn create(&mut self, name: &str, created: &Instant) -> Result<History, sqlx::Error> {
+ pub async fn create(&mut self, name: &Name, created: &Instant) -> Result<History, sqlx::Error> {
let id = Id::generate();
- let channel = sqlx::query!(
+ let name = name.clone();
+ let display_name = name.display();
+ let canonical_name = name.canonical();
+ let created = *created;
+
+ sqlx::query!(
r#"
insert
- into channel (id, name, created_at, created_sequence)
- values ($1, $2, $3, $4)
- returning
- id as "id: Id",
- name as "name!", -- known non-null as we just set it
- created_at as "created_at: DateTime",
- created_sequence as "created_sequence: Sequence"
+ into channel (id, created_at, created_sequence)
+ values ($1, $2, $3)
"#,
id,
- name,
created.at,
created.sequence,
)
- .map(|row| History {
+ .execute(&mut *self.0)
+ .await?;
+
+ sqlx::query!(
+ r#"
+ insert into channel_name (id, display_name, canonical_name)
+ values ($1, $2, $3)
+ "#,
+ id,
+ display_name,
+ canonical_name,
+ )
+ .execute(&mut *self.0)
+ .await?;
+
+ let channel = History {
channel: Channel {
- id: row.id,
- name: row.name,
+ id,
+ name: name.clone(),
deleted_at: None,
},
- created: Instant::new(row.created_at, row.created_sequence),
+ created,
deleted: None,
- })
- .fetch_one(&mut *self.0)
- .await?;
+ };
Ok(channel)
}
- pub async fn by_id(&mut self, channel: &Id) -> Result<History, sqlx::Error> {
+ pub async fn by_id(&mut self, channel: &Id) -> Result<History, LoadError> {
let channel = sqlx::query!(
r#"
select
id as "id: Id",
- channel.name,
+ name.display_name as "display_name?: String",
+ name.canonical_name as "canonical_name?: String",
channel.created_at as "created_at: DateTime",
channel.created_sequence as "created_sequence: Sequence",
deleted.deleted_at as "deleted_at?: DateTime",
deleted.deleted_sequence as "deleted_sequence?: Sequence"
from channel
+ left join channel_name as name
+ using (id)
left join channel_deleted as deleted
using (id)
where id = $1
"#,
channel,
)
- .map(|row| History {
- channel: Channel {
- id: row.id,
- name: row.name.unwrap_or_default(),
- deleted_at: row.deleted_at,
- },
- created: Instant::new(row.created_at, row.created_sequence),
- deleted: Instant::optional(row.deleted_at, row.deleted_sequence),
+ .map(|row| {
+ Ok::<_, name::Error>(History {
+ channel: Channel {
+ id: row.id,
+ name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(),
+ deleted_at: row.deleted_at,
+ },
+ created: Instant::new(row.created_at, row.created_sequence),
+ deleted: Instant::optional(row.deleted_at, row.deleted_sequence),
+ })
})
.fetch_one(&mut *self.0)
- .await?;
+ .await??;
Ok(channel)
}
- pub async fn all(&mut self, resume_at: ResumePoint) -> Result<Vec<History>, sqlx::Error> {
+ pub async fn all(&mut self, resume_at: ResumePoint) -> Result<Vec<History>, LoadError> {
let channels = sqlx::query!(
r#"
select
id as "id: Id",
- channel.name,
+ name.display_name as "display_name: String",
+ name.canonical_name as "canonical_name: String",
channel.created_at as "created_at: DateTime",
channel.created_sequence as "created_sequence: Sequence",
- deleted.deleted_at as "deleted_at: DateTime",
- deleted.deleted_sequence as "deleted_sequence: Sequence"
+ deleted.deleted_at as "deleted_at?: DateTime",
+ deleted.deleted_sequence as "deleted_sequence?: Sequence"
from channel
+ left join channel_name as name
+ using (id)
left join channel_deleted as deleted
using (id)
where coalesce(channel.created_sequence <= $1, true)
- order by channel.name
+ order by name.canonical_name
"#,
resume_at,
)
- .map(|row| History {
- channel: Channel {
- id: row.id,
- name: row.name.unwrap_or_default(),
- deleted_at: row.deleted_at,
- },
- created: Instant::new(row.created_at, row.created_sequence),
- deleted: Instant::optional(row.deleted_at, row.deleted_sequence),
+ .map(|row| {
+ Ok::<_, name::Error>(History {
+ channel: Channel {
+ id: row.id,
+ name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(),
+ deleted_at: row.deleted_at,
+ },
+ created: Instant::new(row.created_at, row.created_sequence),
+ deleted: Instant::optional(row.deleted_at, row.deleted_sequence),
+ })
})
- .fetch_all(&mut *self.0)
+ .fetch(&mut *self.0)
+ .map(|res| Ok::<_, LoadError>(res??))
+ .try_collect()
.await?;
Ok(channels)
}
- pub async fn replay(
- &mut self,
- resume_at: Option<Sequence>,
- ) -> Result<Vec<History>, sqlx::Error> {
+ pub async fn replay(&mut self, resume_at: Option<Sequence>) -> Result<Vec<History>, LoadError> {
let channels = sqlx::query!(
r#"
select
id as "id: Id",
- channel.name,
+ name.display_name as "display_name: String",
+ name.canonical_name as "canonical_name: String",
channel.created_at as "created_at: DateTime",
channel.created_sequence as "created_sequence: Sequence",
- deleted.deleted_at as "deleted_at: DateTime",
- deleted.deleted_sequence as "deleted_sequence: Sequence"
+ deleted.deleted_at as "deleted_at?: DateTime",
+ deleted.deleted_sequence as "deleted_sequence?: Sequence"
from channel
+ left join channel_name as name
+ using (id)
left join channel_deleted as deleted
using (id)
where coalesce(channel.created_sequence > $1, true)
"#,
resume_at,
)
- .map(|row| History {
- channel: Channel {
- id: row.id,
- name: row.name.unwrap_or_default(),
- deleted_at: row.deleted_at,
- },
- created: Instant::new(row.created_at, row.created_sequence),
- deleted: Instant::optional(row.deleted_at, row.deleted_sequence),
+ .map(|row| {
+ Ok::<_, name::Error>(History {
+ channel: Channel {
+ id: row.id,
+ name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(),
+ deleted_at: row.deleted_at,
+ },
+ created: Instant::new(row.created_at, row.created_sequence),
+ deleted: Instant::optional(row.deleted_at, row.deleted_sequence),
+ })
})
- .fetch_all(&mut *self.0)
+ .fetch(&mut *self.0)
+ .map(|res| Ok::<_, LoadError>(res??))
+ .try_collect()
.await?;
Ok(channels)
@@ -156,19 +187,18 @@ impl<'c> Channels<'c> {
&mut self,
channel: &History,
deleted: &Instant,
- ) -> Result<History, sqlx::Error> {
+ ) -> Result<History, LoadError> {
let id = channel.id();
- sqlx::query_scalar!(
+ sqlx::query!(
r#"
insert into channel_deleted (id, deleted_at, deleted_sequence)
values ($1, $2, $3)
- returning 1 as "deleted: bool"
"#,
id,
deleted.at,
deleted.sequence,
)
- .fetch_one(&mut *self.0)
+ .execute(&mut *self.0)
.await?;
// Small social responsibility hack here: when a channel is deleted, its name is
@@ -179,16 +209,14 @@ impl<'c> Channels<'c> {
// This also avoids the need for a separate name reservation table to ensure
// that live channels have unique names, since the `channel` table's name field
// is unique over non-null values.
- sqlx::query_scalar!(
+ sqlx::query!(
r#"
- update channel
- set name = null
+ delete from channel_name
where id = $1
- returning 1 as "updated: bool"
"#,
id,
)
- .fetch_one(&mut *self.0)
+ .execute(&mut *self.0)
.await?;
let channel = self.by_id(id).await?;
@@ -230,38 +258,98 @@ impl<'c> Channels<'c> {
Ok(())
}
- pub async fn expired(&mut self, expired_at: &DateTime) -> Result<Vec<History>, sqlx::Error> {
+ pub async fn expired(&mut self, expired_at: &DateTime) -> Result<Vec<History>, LoadError> {
let channels = sqlx::query!(
r#"
select
channel.id as "id: Id",
- channel.name,
+ name.display_name as "display_name: String",
+ name.canonical_name as "canonical_name: String",
channel.created_at as "created_at: DateTime",
channel.created_sequence as "created_sequence: Sequence",
deleted.deleted_at as "deleted_at?: DateTime",
deleted.deleted_sequence as "deleted_sequence?: Sequence"
from channel
+ left join channel_name as name
+ using (id)
left join channel_deleted as deleted
using (id)
left join message
+ on channel.id = message.channel
where channel.created_at < $1
and message.id is null
and deleted.id is null
"#,
expired_at,
)
- .map(|row| History {
- channel: Channel {
- id: row.id,
- name: row.name.unwrap_or_default(),
- deleted_at: row.deleted_at,
- },
- created: Instant::new(row.created_at, row.created_sequence),
- deleted: Instant::optional(row.deleted_at, row.deleted_sequence),
+ .map(|row| {
+ Ok::<_, name::Error>(History {
+ channel: Channel {
+ id: row.id,
+ name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(),
+ deleted_at: row.deleted_at,
+ },
+ created: Instant::new(row.created_at, row.created_sequence),
+ deleted: Instant::optional(row.deleted_at, row.deleted_sequence),
+ })
})
- .fetch_all(&mut *self.0)
+ .fetch(&mut *self.0)
+ .map(|res| Ok::<_, LoadError>(res??))
+ .try_collect()
.await?;
Ok(channels)
}
+
+ pub async fn recanonicalize(&mut self) -> Result<(), sqlx::Error> {
+ let channels = sqlx::query!(
+ r#"
+ select
+ id as "id: Id",
+ display_name as "display_name: String"
+ from channel_name
+ "#,
+ )
+ .fetch_all(&mut *self.0)
+ .await?;
+
+ for channel in channels {
+ let name = Name::from(channel.display_name);
+ let canonical_name = name.canonical();
+
+ sqlx::query!(
+ r#"
+ update channel_name
+ set canonical_name = $1
+ where id = $2
+ "#,
+ canonical_name,
+ channel.id,
+ )
+ .execute(&mut *self.0)
+ .await?;
+ }
+
+ Ok(())
+ }
+}
+
+#[derive(Debug, thiserror::Error)]
+#[error(transparent)]
+pub enum LoadError {
+ Database(#[from] sqlx::Error),
+ Name(#[from] name::Error),
+}
+
+impl<T> NotFound for Result<T, LoadError> {
+ type Ok = T;
+ type Error = LoadError;
+
+ fn optional(self) -> Result<Option<T>, LoadError> {
+ match self {
+ Ok(value) => Ok(Some(value)),
+ Err(LoadError::Database(sqlx::Error::RowNotFound)) => Ok(None),
+ Err(other) => Err(other),
+ }
+ }
}
diff --git a/src/channel/routes/channel/post.rs b/src/channel/routes/channel/post.rs
index b489a77..d0cae05 100644
--- a/src/channel/routes/channel/post.rs
+++ b/src/channel/routes/channel/post.rs
@@ -9,7 +9,7 @@ use crate::{
clock::RequestedAt,
error::{Internal, NotFound},
login::Login,
- message::{app::SendError, Message},
+ message::{app::SendError, Body, Message},
};
pub async fn handler(
@@ -29,7 +29,7 @@ pub async fn handler(
#[derive(serde::Deserialize)]
pub struct Request {
- pub body: String,
+ pub body: Body,
}
#[derive(Debug)]
diff --git a/src/channel/routes/post.rs b/src/channel/routes/post.rs
index a05c312..9781dd7 100644
--- a/src/channel/routes/post.rs
+++ b/src/channel/routes/post.rs
@@ -10,6 +10,7 @@ use crate::{
clock::RequestedAt,
error::Internal,
login::Login,
+ name::Name,
};
pub async fn handler(
@@ -29,7 +30,7 @@ pub async fn handler(
#[derive(serde::Deserialize)]
pub struct Request {
- pub name: String,
+ pub name: Name,
}
#[derive(Debug)]
diff --git a/src/channel/snapshot.rs b/src/channel/snapshot.rs
index 2b7d89a..129c0d6 100644
--- a/src/channel/snapshot.rs
+++ b/src/channel/snapshot.rs
@@ -2,12 +2,12 @@ use super::{
event::{Created, Event},
Id,
};
-use crate::clock::DateTime;
+use crate::{clock::DateTime, name::Name};
#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
pub struct Channel {
pub id: Id,
- pub name: String,
+ pub name: Name,
#[serde(skip_serializing_if = "Option::is_none")]
pub deleted_at: Option<DateTime>,
}