summaryrefslogtreecommitdiff
path: root/src/channel
diff options
context:
space:
mode:
Diffstat (limited to 'src/channel')
-rw-r--r--src/channel/app.rs65
-rw-r--r--src/channel/history.rs12
-rw-r--r--src/channel/mod.rs1
-rw-r--r--src/channel/repo.rs60
-rw-r--r--src/channel/routes/channel/delete.rs9
-rw-r--r--src/channel/routes/channel/test/delete.rs34
-rw-r--r--src/channel/routes/channel/test/post.rs3
-rw-r--r--src/channel/routes/post.rs3
-rw-r--r--src/channel/routes/test.rs27
-rw-r--r--src/channel/validate.rs23
10 files changed, 163 insertions, 74 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs
index 8359277..21784e9 100644
--- a/src/channel/app.rs
+++ b/src/channel/app.rs
@@ -4,13 +4,13 @@ use sqlx::sqlite::SqlitePool;
use super::{
repo::{LoadError, Provider as _},
- Channel, Id,
+ validate, Channel, Id,
};
use crate::{
clock::DateTime,
db::{Duplicate as _, NotFound as _},
event::{repo::Provider as _, Broadcaster, Event, Sequence},
- message::repo::Provider as _,
+ message::{self, repo::Provider as _},
name::{self, Name},
};
@@ -25,6 +25,10 @@ impl<'a> Channels<'a> {
}
pub async fn create(&self, name: &Name, created_at: &DateTime) -> Result<Channel, CreateError> {
+ if !validate::name(name) {
+ return Err(CreateError::InvalidName(name.clone()));
+ }
+
let mut tx = self.db.begin().await?;
let created = tx.sequence().next(created_at).await?;
let channel = tx
@@ -44,38 +48,36 @@ impl<'a> Channels<'a> {
// it exists in the specific moment when you call it.
pub async fn get(&self, channel: &Id) -> Result<Channel, Error> {
let not_found = || Error::NotFound(channel.clone());
+ let deleted = || Error::Deleted(channel.clone());
let mut tx = self.db.begin().await?;
let channel = tx.channels().by_id(channel).await.not_found(not_found)?;
tx.commit().await?;
- channel.as_snapshot().ok_or_else(not_found)
+ channel.as_snapshot().ok_or_else(deleted)
}
- pub async fn delete(&self, channel: &Id, deleted_at: &DateTime) -> Result<(), Error> {
+ pub async fn delete(&self, channel: &Id, deleted_at: &DateTime) -> Result<(), DeleteError> {
let mut tx = self.db.begin().await?;
let channel = tx
.channels()
.by_id(channel)
.await
- .not_found(|| Error::NotFound(channel.clone()))?;
+ .not_found(|| DeleteError::NotFound(channel.clone()))?;
channel
.as_snapshot()
- .ok_or_else(|| Error::Deleted(channel.id().clone()))?;
+ .ok_or_else(|| DeleteError::Deleted(channel.id().clone()))?;
let mut events = Vec::new();
let messages = tx.messages().live(&channel).await?;
- for message in messages {
- let deleted = tx.sequence().next(deleted_at).await?;
- let message = tx.messages().delete(&message, &deleted).await?;
- events.extend(
- message
- .events()
- .filter(Sequence::start_from(deleted.sequence))
- .map(Event::from),
- );
+ let has_messages = messages
+ .iter()
+ .map(message::History::as_snapshot)
+ .any(|message| message.is_some());
+ if has_messages {
+ return Err(DeleteError::NotEmpty(channel.id().clone()));
}
let deleted = tx.sequence().next(deleted_at).await?;
@@ -135,20 +137,14 @@ impl<'a> Channels<'a> {
Ok(())
}
-
- pub async fn recanonicalize(&self) -> Result<(), sqlx::Error> {
- let mut tx = self.db.begin().await?;
- tx.channels().recanonicalize().await?;
- tx.commit().await?;
-
- Ok(())
- }
}
#[derive(Debug, thiserror::Error)]
pub enum CreateError {
#[error("channel named {0} already exists")]
DuplicateName(Name),
+ #[error("invalid channel name: {0}")]
+ InvalidName(Name),
#[error(transparent)]
Database(#[from] sqlx::Error),
#[error(transparent)]
@@ -186,6 +182,29 @@ impl From<LoadError> for Error {
}
#[derive(Debug, thiserror::Error)]
+pub enum DeleteError {
+ #[error("channel {0} not found")]
+ NotFound(Id),
+ #[error("channel {0} deleted")]
+ Deleted(Id),
+ #[error("channel {0} not empty")]
+ NotEmpty(Id),
+ #[error(transparent)]
+ Database(#[from] sqlx::Error),
+ #[error(transparent)]
+ Name(#[from] name::Error),
+}
+
+impl From<LoadError> for DeleteError {
+ fn from(error: LoadError) -> Self {
+ match error {
+ LoadError::Database(error) => error.into(),
+ LoadError::Name(error) => error.into(),
+ }
+ }
+}
+
+#[derive(Debug, thiserror::Error)]
pub enum ExpireError {
#[error(transparent)]
Database(#[from] sqlx::Error),
diff --git a/src/channel/history.rs b/src/channel/history.rs
index 4b9fcc7..ef2120d 100644
--- a/src/channel/history.rs
+++ b/src/channel/history.rs
@@ -1,8 +1,10 @@
+use itertools::Itertools as _;
+
use super::{
event::{Created, Deleted, Event},
Channel, Id,
};
-use crate::event::{Instant, ResumePoint, Sequence};
+use crate::event::{Instant, Sequence};
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct History {
@@ -26,9 +28,9 @@ impl History {
self.channel.clone()
}
- pub fn as_of(&self, resume_point: impl Into<ResumePoint>) -> Option<Channel> {
+ pub fn as_of(&self, resume_point: Sequence) -> Option<Channel> {
self.events()
- .filter(Sequence::up_to(resume_point.into()))
+ .filter(Sequence::up_to(resume_point))
.collect()
}
@@ -41,7 +43,9 @@ impl History {
// Event factories
impl History {
pub fn events(&self) -> impl Iterator<Item = Event> {
- [self.created()].into_iter().chain(self.deleted())
+ [self.created()]
+ .into_iter()
+ .merge_by(self.deleted(), Sequence::merge)
}
fn created(&self) -> Event {
diff --git a/src/channel/mod.rs b/src/channel/mod.rs
index eb8200b..d5ba828 100644
--- a/src/channel/mod.rs
+++ b/src/channel/mod.rs
@@ -5,5 +5,6 @@ mod id;
pub mod repo;
mod routes;
mod snapshot;
+mod validate;
pub use self::{event::Event, history::History, id::Id, routes::router, snapshot::Channel};
diff --git a/src/channel/repo.rs b/src/channel/repo.rs
index a49db52..6612151 100644
--- a/src/channel/repo.rs
+++ b/src/channel/repo.rs
@@ -5,7 +5,7 @@ use crate::{
channel::{Channel, History, Id},
clock::DateTime,
db::NotFound,
- event::{Instant, ResumePoint, Sequence},
+ event::{Instant, Sequence},
name::{self, Name},
};
@@ -32,12 +32,13 @@ impl<'c> Channels<'c> {
sqlx::query!(
r#"
insert
- into channel (id, created_at, created_sequence)
- values ($1, $2, $3)
+ into channel (id, created_at, created_sequence, last_sequence)
+ values ($1, $2, $3, $4)
"#,
id,
created.at,
created.sequence,
+ created.sequence,
)
.execute(&mut *self.0)
.await?;
@@ -144,13 +145,13 @@ impl<'c> Channels<'c> {
Ok(channels)
}
- pub async fn replay(&mut self, resume_at: ResumePoint) -> Result<Vec<History>, LoadError> {
+ pub async fn replay(&mut self, resume_at: Sequence) -> Result<Vec<History>, LoadError> {
let channels = sqlx::query!(
r#"
select
id as "id: Id",
- name.display_name as "display_name: String",
- name.canonical_name as "canonical_name: String",
+ name.display_name as "display_name?: String",
+ name.canonical_name as "canonical_name?: String",
channel.created_at as "created_at: DateTime",
channel.created_sequence as "created_sequence: Sequence",
deleted.deleted_at as "deleted_at?: DateTime",
@@ -160,7 +161,7 @@ impl<'c> Channels<'c> {
using (id)
left join channel_deleted as deleted
using (id)
- where coalesce(channel.created_sequence > $1, true)
+ where channel.last_sequence > $1
"#,
resume_at,
)
@@ -191,6 +192,19 @@ impl<'c> Channels<'c> {
let id = channel.id();
sqlx::query!(
r#"
+ update channel
+ set last_sequence = max(last_sequence, $1)
+ where id = $2
+ returning id as "id: Id"
+ "#,
+ deleted.sequence,
+ id,
+ )
+ .fetch_one(&mut *self.0)
+ .await?;
+
+ sqlx::query!(
+ r#"
insert into channel_deleted (id, deleted_at, deleted_sequence)
values ($1, $2, $3)
"#,
@@ -300,38 +314,6 @@ impl<'c> Channels<'c> {
Ok(channels)
}
-
- pub async fn recanonicalize(&mut self) -> Result<(), sqlx::Error> {
- let channels = sqlx::query!(
- r#"
- select
- id as "id: Id",
- display_name as "display_name: String"
- from channel_name
- "#,
- )
- .fetch_all(&mut *self.0)
- .await?;
-
- for channel in channels {
- let name = Name::from(channel.display_name);
- let canonical_name = name.canonical();
-
- sqlx::query!(
- r#"
- update channel_name
- set canonical_name = $1
- where id = $2
- "#,
- canonical_name,
- channel.id,
- )
- .execute(&mut *self.0)
- .await?;
- }
-
- Ok(())
- }
}
#[derive(Debug, thiserror::Error)]
diff --git a/src/channel/routes/channel/delete.rs b/src/channel/routes/channel/delete.rs
index 2d2b5f1..9c093c1 100644
--- a/src/channel/routes/channel/delete.rs
+++ b/src/channel/routes/channel/delete.rs
@@ -36,14 +36,19 @@ impl IntoResponse for Response {
#[derive(Debug, thiserror::Error)]
#[error(transparent)]
-pub struct Error(#[from] pub app::Error);
+pub struct Error(#[from] pub app::DeleteError);
impl IntoResponse for Error {
fn into_response(self) -> response::Response {
let Self(error) = self;
#[allow(clippy::match_wildcard_for_single_variants)]
match error {
- app::Error::NotFound(_) | app::Error::Deleted(_) => NotFound(error).into_response(),
+ app::DeleteError::NotFound(_) | app::DeleteError::Deleted(_) => {
+ NotFound(error).into_response()
+ }
+ app::DeleteError::NotEmpty(_) => {
+ (StatusCode::CONFLICT, error.to_string()).into_response()
+ }
other => Internal::from(other).into_response(),
}
}
diff --git a/src/channel/routes/channel/test/delete.rs b/src/channel/routes/channel/test/delete.rs
index 0371b0a..77a0b03 100644
--- a/src/channel/routes/channel/test/delete.rs
+++ b/src/channel/routes/channel/test/delete.rs
@@ -55,7 +55,7 @@ pub async fn invalid_channel_id() {
// Verify the response
- assert!(matches!(error, app::Error::NotFound(id) if id == channel));
+ assert!(matches!(error, app::DeleteError::NotFound(id) if id == channel));
}
#[tokio::test]
@@ -84,7 +84,7 @@ pub async fn channel_deleted() {
// Verify the response
- assert!(matches!(error, app::Error::Deleted(id) if id == channel.id));
+ assert!(matches!(error, app::DeleteError::Deleted(id) if id == channel.id));
}
#[tokio::test]
@@ -113,7 +113,7 @@ pub async fn channel_expired() {
// Verify the response
- assert!(matches!(error, app::Error::Deleted(id) if id == channel.id));
+ assert!(matches!(error, app::DeleteError::Deleted(id) if id == channel.id));
}
#[tokio::test]
@@ -147,5 +147,31 @@ pub async fn channel_purged() {
// Verify the response
- assert!(matches!(error, app::Error::NotFound(id) if id == channel.id));
+ assert!(matches!(error, app::DeleteError::NotFound(id) if id == channel.id));
+}
+
+#[tokio::test]
+pub async fn channel_not_empty() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+ let sender = fixtures::login::create(&app, &fixtures::now()).await;
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await;
+
+ // Send the request
+
+ let deleter = fixtures::identity::create(&app, &fixtures::now()).await;
+ let delete::Error(error) = delete::handler(
+ State(app.clone()),
+ Path(channel.id.clone()),
+ fixtures::now(),
+ deleter,
+ )
+ .await
+ .expect_err("deleting a channel with messages fails");
+
+ // Verify the response
+
+ assert!(matches!(error, app::DeleteError::NotEmpty(id) if id == channel.id));
}
diff --git a/src/channel/routes/channel/test/post.rs b/src/channel/routes/channel/test/post.rs
index 111a703..bc0684b 100644
--- a/src/channel/routes/channel/test/post.rs
+++ b/src/channel/routes/channel/test/post.rs
@@ -15,6 +15,7 @@ async fn messages_in_order() {
let app = fixtures::scratch_app().await;
let sender = fixtures::identity::create(&app, &fixtures::now()).await;
let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
// Call the endpoint (twice)
@@ -41,7 +42,7 @@ async fn messages_in_order() {
let mut events = app
.events()
- .subscribe(None)
+ .subscribe(resume_point)
.await
.expect("subscribing to a valid channel succeeds")
.filter_map(fixtures::event::message)
diff --git a/src/channel/routes/post.rs b/src/channel/routes/post.rs
index 810445c..2cf1cc0 100644
--- a/src/channel/routes/post.rs
+++ b/src/channel/routes/post.rs
@@ -54,6 +54,9 @@ impl IntoResponse for Error {
app::CreateError::DuplicateName(_) => {
(StatusCode::CONFLICT, error.to_string()).into_response()
}
+ app::CreateError::InvalidName(_) => {
+ (StatusCode::BAD_REQUEST, error.to_string()).into_response()
+ }
other => Internal::from(other).into_response(),
}
}
diff --git a/src/channel/routes/test.rs b/src/channel/routes/test.rs
index 10b1e8d..cba8f2e 100644
--- a/src/channel/routes/test.rs
+++ b/src/channel/routes/test.rs
@@ -16,6 +16,7 @@ async fn new_channel() {
let app = fixtures::scratch_app().await;
let creator = fixtures::identity::create(&app, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
// Call the endpoint
@@ -44,7 +45,7 @@ async fn new_channel() {
let mut events = app
.events()
- .subscribe(None)
+ .subscribe(resume_point)
.await
.expect("subscribing never fails")
.filter_map(fixtures::event::channel)
@@ -116,6 +117,30 @@ async fn conflicting_canonical_name() {
}
#[tokio::test]
+async fn invalid_name() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let creator = fixtures::identity::create(&app, &fixtures::now()).await;
+
+ // Call the endpoint
+
+ let name = fixtures::channel::propose_invalid_name();
+ let request = post::Request { name: name.clone() };
+ let post::Error(error) =
+ post::handler(State(app.clone()), creator, fixtures::now(), Json(request))
+ .await
+ .expect_err("invalid channel name should fail the request");
+
+ // Verify the structure of the response
+
+ assert!(matches!(
+ error,
+ app::CreateError::InvalidName(error_name) if name == error_name
+ ));
+}
+
+#[tokio::test]
async fn name_reusable_after_delete() {
// Set up the environment
diff --git a/src/channel/validate.rs b/src/channel/validate.rs
new file mode 100644
index 0000000..0c97293
--- /dev/null
+++ b/src/channel/validate.rs
@@ -0,0 +1,23 @@
+use unicode_segmentation::UnicodeSegmentation as _;
+
+use crate::name::Name;
+
+// Picked out of a hat. The power of two is not meaningful.
+const NAME_TOO_LONG: usize = 64;
+
+pub fn name(name: &Name) -> bool {
+ let display = name.display();
+
+ [
+ display.graphemes(true).count() < NAME_TOO_LONG,
+ display.chars().all(|ch| !ch.is_control()),
+ display.chars().next().is_some_and(|c| !c.is_whitespace()),
+ display.chars().last().is_some_and(|c| !c.is_whitespace()),
+ display
+ .chars()
+ .zip(display.chars().skip(1))
+ .all(|(a, b)| !(a.is_whitespace() && b.is_whitespace())),
+ ]
+ .into_iter()
+ .all(|value| value)
+}