diff options
Diffstat (limited to 'src/channel')
| -rw-r--r-- | src/channel/app.rs | 65 | ||||
| -rw-r--r-- | src/channel/history.rs | 12 | ||||
| -rw-r--r-- | src/channel/mod.rs | 1 | ||||
| -rw-r--r-- | src/channel/repo.rs | 60 | ||||
| -rw-r--r-- | src/channel/routes/channel/delete.rs | 9 | ||||
| -rw-r--r-- | src/channel/routes/channel/test/delete.rs | 34 | ||||
| -rw-r--r-- | src/channel/routes/channel/test/post.rs | 3 | ||||
| -rw-r--r-- | src/channel/routes/post.rs | 3 | ||||
| -rw-r--r-- | src/channel/routes/test.rs | 27 | ||||
| -rw-r--r-- | src/channel/validate.rs | 23 |
10 files changed, 163 insertions, 74 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs index 8359277..21784e9 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -4,13 +4,13 @@ use sqlx::sqlite::SqlitePool; use super::{ repo::{LoadError, Provider as _}, - Channel, Id, + validate, Channel, Id, }; use crate::{ clock::DateTime, db::{Duplicate as _, NotFound as _}, event::{repo::Provider as _, Broadcaster, Event, Sequence}, - message::repo::Provider as _, + message::{self, repo::Provider as _}, name::{self, Name}, }; @@ -25,6 +25,10 @@ impl<'a> Channels<'a> { } pub async fn create(&self, name: &Name, created_at: &DateTime) -> Result<Channel, CreateError> { + if !validate::name(name) { + return Err(CreateError::InvalidName(name.clone())); + } + let mut tx = self.db.begin().await?; let created = tx.sequence().next(created_at).await?; let channel = tx @@ -44,38 +48,36 @@ impl<'a> Channels<'a> { // it exists in the specific moment when you call it. pub async fn get(&self, channel: &Id) -> Result<Channel, Error> { let not_found = || Error::NotFound(channel.clone()); + let deleted = || Error::Deleted(channel.clone()); let mut tx = self.db.begin().await?; let channel = tx.channels().by_id(channel).await.not_found(not_found)?; tx.commit().await?; - channel.as_snapshot().ok_or_else(not_found) + channel.as_snapshot().ok_or_else(deleted) } - pub async fn delete(&self, channel: &Id, deleted_at: &DateTime) -> Result<(), Error> { + pub async fn delete(&self, channel: &Id, deleted_at: &DateTime) -> Result<(), DeleteError> { let mut tx = self.db.begin().await?; let channel = tx .channels() .by_id(channel) .await - .not_found(|| Error::NotFound(channel.clone()))?; + .not_found(|| DeleteError::NotFound(channel.clone()))?; channel .as_snapshot() - .ok_or_else(|| Error::Deleted(channel.id().clone()))?; + .ok_or_else(|| DeleteError::Deleted(channel.id().clone()))?; let mut events = Vec::new(); let messages = tx.messages().live(&channel).await?; - for message in messages { - let deleted = tx.sequence().next(deleted_at).await?; - let message = tx.messages().delete(&message, &deleted).await?; - events.extend( - message - .events() - .filter(Sequence::start_from(deleted.sequence)) - .map(Event::from), - ); + let has_messages = messages + .iter() + .map(message::History::as_snapshot) + .any(|message| message.is_some()); + if has_messages { + return Err(DeleteError::NotEmpty(channel.id().clone())); } let deleted = tx.sequence().next(deleted_at).await?; @@ -135,20 +137,14 @@ impl<'a> Channels<'a> { Ok(()) } - - pub async fn recanonicalize(&self) -> Result<(), sqlx::Error> { - let mut tx = self.db.begin().await?; - tx.channels().recanonicalize().await?; - tx.commit().await?; - - Ok(()) - } } #[derive(Debug, thiserror::Error)] pub enum CreateError { #[error("channel named {0} already exists")] DuplicateName(Name), + #[error("invalid channel name: {0}")] + InvalidName(Name), #[error(transparent)] Database(#[from] sqlx::Error), #[error(transparent)] @@ -186,6 +182,29 @@ impl From<LoadError> for Error { } #[derive(Debug, thiserror::Error)] +pub enum DeleteError { + #[error("channel {0} not found")] + NotFound(Id), + #[error("channel {0} deleted")] + Deleted(Id), + #[error("channel {0} not empty")] + NotEmpty(Id), + #[error(transparent)] + Database(#[from] sqlx::Error), + #[error(transparent)] + Name(#[from] name::Error), +} + +impl From<LoadError> for DeleteError { + fn from(error: LoadError) -> Self { + match error { + LoadError::Database(error) => error.into(), + LoadError::Name(error) => error.into(), + } + } +} + +#[derive(Debug, thiserror::Error)] pub enum ExpireError { #[error(transparent)] Database(#[from] sqlx::Error), diff --git a/src/channel/history.rs b/src/channel/history.rs index 4b9fcc7..ef2120d 100644 --- a/src/channel/history.rs +++ b/src/channel/history.rs @@ -1,8 +1,10 @@ +use itertools::Itertools as _; + use super::{ event::{Created, Deleted, Event}, Channel, Id, }; -use crate::event::{Instant, ResumePoint, Sequence}; +use crate::event::{Instant, Sequence}; #[derive(Clone, Debug, Eq, PartialEq)] pub struct History { @@ -26,9 +28,9 @@ impl History { self.channel.clone() } - pub fn as_of(&self, resume_point: impl Into<ResumePoint>) -> Option<Channel> { + pub fn as_of(&self, resume_point: Sequence) -> Option<Channel> { self.events() - .filter(Sequence::up_to(resume_point.into())) + .filter(Sequence::up_to(resume_point)) .collect() } @@ -41,7 +43,9 @@ impl History { // Event factories impl History { pub fn events(&self) -> impl Iterator<Item = Event> { - [self.created()].into_iter().chain(self.deleted()) + [self.created()] + .into_iter() + .merge_by(self.deleted(), Sequence::merge) } fn created(&self) -> Event { diff --git a/src/channel/mod.rs b/src/channel/mod.rs index eb8200b..d5ba828 100644 --- a/src/channel/mod.rs +++ b/src/channel/mod.rs @@ -5,5 +5,6 @@ mod id; pub mod repo; mod routes; mod snapshot; +mod validate; pub use self::{event::Event, history::History, id::Id, routes::router, snapshot::Channel}; diff --git a/src/channel/repo.rs b/src/channel/repo.rs index a49db52..6612151 100644 --- a/src/channel/repo.rs +++ b/src/channel/repo.rs @@ -5,7 +5,7 @@ use crate::{ channel::{Channel, History, Id}, clock::DateTime, db::NotFound, - event::{Instant, ResumePoint, Sequence}, + event::{Instant, Sequence}, name::{self, Name}, }; @@ -32,12 +32,13 @@ impl<'c> Channels<'c> { sqlx::query!( r#" insert - into channel (id, created_at, created_sequence) - values ($1, $2, $3) + into channel (id, created_at, created_sequence, last_sequence) + values ($1, $2, $3, $4) "#, id, created.at, created.sequence, + created.sequence, ) .execute(&mut *self.0) .await?; @@ -144,13 +145,13 @@ impl<'c> Channels<'c> { Ok(channels) } - pub async fn replay(&mut self, resume_at: ResumePoint) -> Result<Vec<History>, LoadError> { + pub async fn replay(&mut self, resume_at: Sequence) -> Result<Vec<History>, LoadError> { let channels = sqlx::query!( r#" select id as "id: Id", - name.display_name as "display_name: String", - name.canonical_name as "canonical_name: String", + name.display_name as "display_name?: String", + name.canonical_name as "canonical_name?: String", channel.created_at as "created_at: DateTime", channel.created_sequence as "created_sequence: Sequence", deleted.deleted_at as "deleted_at?: DateTime", @@ -160,7 +161,7 @@ impl<'c> Channels<'c> { using (id) left join channel_deleted as deleted using (id) - where coalesce(channel.created_sequence > $1, true) + where channel.last_sequence > $1 "#, resume_at, ) @@ -191,6 +192,19 @@ impl<'c> Channels<'c> { let id = channel.id(); sqlx::query!( r#" + update channel + set last_sequence = max(last_sequence, $1) + where id = $2 + returning id as "id: Id" + "#, + deleted.sequence, + id, + ) + .fetch_one(&mut *self.0) + .await?; + + sqlx::query!( + r#" insert into channel_deleted (id, deleted_at, deleted_sequence) values ($1, $2, $3) "#, @@ -300,38 +314,6 @@ impl<'c> Channels<'c> { Ok(channels) } - - pub async fn recanonicalize(&mut self) -> Result<(), sqlx::Error> { - let channels = sqlx::query!( - r#" - select - id as "id: Id", - display_name as "display_name: String" - from channel_name - "#, - ) - .fetch_all(&mut *self.0) - .await?; - - for channel in channels { - let name = Name::from(channel.display_name); - let canonical_name = name.canonical(); - - sqlx::query!( - r#" - update channel_name - set canonical_name = $1 - where id = $2 - "#, - canonical_name, - channel.id, - ) - .execute(&mut *self.0) - .await?; - } - - Ok(()) - } } #[derive(Debug, thiserror::Error)] diff --git a/src/channel/routes/channel/delete.rs b/src/channel/routes/channel/delete.rs index 2d2b5f1..9c093c1 100644 --- a/src/channel/routes/channel/delete.rs +++ b/src/channel/routes/channel/delete.rs @@ -36,14 +36,19 @@ impl IntoResponse for Response { #[derive(Debug, thiserror::Error)] #[error(transparent)] -pub struct Error(#[from] pub app::Error); +pub struct Error(#[from] pub app::DeleteError); impl IntoResponse for Error { fn into_response(self) -> response::Response { let Self(error) = self; #[allow(clippy::match_wildcard_for_single_variants)] match error { - app::Error::NotFound(_) | app::Error::Deleted(_) => NotFound(error).into_response(), + app::DeleteError::NotFound(_) | app::DeleteError::Deleted(_) => { + NotFound(error).into_response() + } + app::DeleteError::NotEmpty(_) => { + (StatusCode::CONFLICT, error.to_string()).into_response() + } other => Internal::from(other).into_response(), } } diff --git a/src/channel/routes/channel/test/delete.rs b/src/channel/routes/channel/test/delete.rs index 0371b0a..77a0b03 100644 --- a/src/channel/routes/channel/test/delete.rs +++ b/src/channel/routes/channel/test/delete.rs @@ -55,7 +55,7 @@ pub async fn invalid_channel_id() { // Verify the response - assert!(matches!(error, app::Error::NotFound(id) if id == channel)); + assert!(matches!(error, app::DeleteError::NotFound(id) if id == channel)); } #[tokio::test] @@ -84,7 +84,7 @@ pub async fn channel_deleted() { // Verify the response - assert!(matches!(error, app::Error::Deleted(id) if id == channel.id)); + assert!(matches!(error, app::DeleteError::Deleted(id) if id == channel.id)); } #[tokio::test] @@ -113,7 +113,7 @@ pub async fn channel_expired() { // Verify the response - assert!(matches!(error, app::Error::Deleted(id) if id == channel.id)); + assert!(matches!(error, app::DeleteError::Deleted(id) if id == channel.id)); } #[tokio::test] @@ -147,5 +147,31 @@ pub async fn channel_purged() { // Verify the response - assert!(matches!(error, app::Error::NotFound(id) if id == channel.id)); + assert!(matches!(error, app::DeleteError::NotFound(id) if id == channel.id)); +} + +#[tokio::test] +pub async fn channel_not_empty() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let sender = fixtures::login::create(&app, &fixtures::now()).await; + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; + + // Send the request + + let deleter = fixtures::identity::create(&app, &fixtures::now()).await; + let delete::Error(error) = delete::handler( + State(app.clone()), + Path(channel.id.clone()), + fixtures::now(), + deleter, + ) + .await + .expect_err("deleting a channel with messages fails"); + + // Verify the response + + assert!(matches!(error, app::DeleteError::NotEmpty(id) if id == channel.id)); } diff --git a/src/channel/routes/channel/test/post.rs b/src/channel/routes/channel/test/post.rs index 111a703..bc0684b 100644 --- a/src/channel/routes/channel/test/post.rs +++ b/src/channel/routes/channel/test/post.rs @@ -15,6 +15,7 @@ async fn messages_in_order() { let app = fixtures::scratch_app().await; let sender = fixtures::identity::create(&app, &fixtures::now()).await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Call the endpoint (twice) @@ -41,7 +42,7 @@ async fn messages_in_order() { let mut events = app .events() - .subscribe(None) + .subscribe(resume_point) .await .expect("subscribing to a valid channel succeeds") .filter_map(fixtures::event::message) diff --git a/src/channel/routes/post.rs b/src/channel/routes/post.rs index 810445c..2cf1cc0 100644 --- a/src/channel/routes/post.rs +++ b/src/channel/routes/post.rs @@ -54,6 +54,9 @@ impl IntoResponse for Error { app::CreateError::DuplicateName(_) => { (StatusCode::CONFLICT, error.to_string()).into_response() } + app::CreateError::InvalidName(_) => { + (StatusCode::BAD_REQUEST, error.to_string()).into_response() + } other => Internal::from(other).into_response(), } } diff --git a/src/channel/routes/test.rs b/src/channel/routes/test.rs index 10b1e8d..cba8f2e 100644 --- a/src/channel/routes/test.rs +++ b/src/channel/routes/test.rs @@ -16,6 +16,7 @@ async fn new_channel() { let app = fixtures::scratch_app().await; let creator = fixtures::identity::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Call the endpoint @@ -44,7 +45,7 @@ async fn new_channel() { let mut events = app .events() - .subscribe(None) + .subscribe(resume_point) .await .expect("subscribing never fails") .filter_map(fixtures::event::channel) @@ -116,6 +117,30 @@ async fn conflicting_canonical_name() { } #[tokio::test] +async fn invalid_name() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let creator = fixtures::identity::create(&app, &fixtures::now()).await; + + // Call the endpoint + + let name = fixtures::channel::propose_invalid_name(); + let request = post::Request { name: name.clone() }; + let post::Error(error) = + post::handler(State(app.clone()), creator, fixtures::now(), Json(request)) + .await + .expect_err("invalid channel name should fail the request"); + + // Verify the structure of the response + + assert!(matches!( + error, + app::CreateError::InvalidName(error_name) if name == error_name + )); +} + +#[tokio::test] async fn name_reusable_after_delete() { // Set up the environment diff --git a/src/channel/validate.rs b/src/channel/validate.rs new file mode 100644 index 0000000..0c97293 --- /dev/null +++ b/src/channel/validate.rs @@ -0,0 +1,23 @@ +use unicode_segmentation::UnicodeSegmentation as _; + +use crate::name::Name; + +// Picked out of a hat. The power of two is not meaningful. +const NAME_TOO_LONG: usize = 64; + +pub fn name(name: &Name) -> bool { + let display = name.display(); + + [ + display.graphemes(true).count() < NAME_TOO_LONG, + display.chars().all(|ch| !ch.is_control()), + display.chars().next().is_some_and(|c| !c.is_whitespace()), + display.chars().last().is_some_and(|c| !c.is_whitespace()), + display + .chars() + .zip(display.chars().skip(1)) + .all(|(a, b)| !(a.is_whitespace() && b.is_whitespace())), + ] + .into_iter() + .all(|value| value) +} |
