diff options
Diffstat (limited to 'src/channel')
| -rw-r--r-- | src/channel/app.rs | 89 | ||||
| -rw-r--r-- | src/channel/history.rs | 5 | ||||
| -rw-r--r-- | src/channel/repo.rs | 349 | ||||
| -rw-r--r-- | src/channel/routes.rs | 121 | ||||
| -rw-r--r-- | src/channel/routes/channel/delete.rs | 39 | ||||
| -rw-r--r-- | src/channel/routes/channel/mod.rs | 9 | ||||
| -rw-r--r-- | src/channel/routes/channel/post.rs | 58 | ||||
| -rw-r--r-- | src/channel/routes/channel/test/delete.rs | 154 | ||||
| -rw-r--r-- | src/channel/routes/channel/test/mod.rs | 2 | ||||
| -rw-r--r-- | src/channel/routes/channel/test/post.rs (renamed from src/channel/routes/test/on_send.rs) | 71 | ||||
| -rw-r--r-- | src/channel/routes/mod.rs | 19 | ||||
| -rw-r--r-- | src/channel/routes/post.rs | 60 | ||||
| -rw-r--r-- | src/channel/routes/test.rs | 221 | ||||
| -rw-r--r-- | src/channel/routes/test/mod.rs | 2 | ||||
| -rw-r--r-- | src/channel/routes/test/on_create.rs | 88 | ||||
| -rw-r--r-- | src/channel/snapshot.rs | 5 |
16 files changed, 951 insertions, 341 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs index 5d6cada..7bfa0f7 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -2,12 +2,16 @@ use chrono::TimeDelta; use itertools::Itertools; use sqlx::sqlite::SqlitePool; -use super::{repo::Provider as _, Channel, History, Id}; +use super::{ + repo::{LoadError, Provider as _}, + Channel, History, Id, +}; use crate::{ clock::DateTime, db::{Duplicate as _, NotFound as _}, event::{repo::Provider as _, Broadcaster, Event, Sequence}, message::repo::Provider as _, + name::{self, Name}, }; pub struct Channels<'a> { @@ -20,14 +24,14 @@ impl<'a> Channels<'a> { Self { db, events } } - pub async fn create(&self, name: &str, created_at: &DateTime) -> Result<Channel, CreateError> { + pub async fn create(&self, name: &Name, created_at: &DateTime) -> Result<Channel, CreateError> { let mut tx = self.db.begin().await?; let created = tx.sequence().next(created_at).await?; let channel = tx .channels() .create(name, &created) .await - .duplicate(|| CreateError::DuplicateName(name.into()))?; + .duplicate(|| CreateError::DuplicateName(name.clone()))?; tx.commit().await?; self.events @@ -38,12 +42,12 @@ impl<'a> Channels<'a> { // This function is careless with respect to time, and gets you the channel as // it exists in the specific moment when you call it. - pub async fn get(&self, channel: &Id) -> Result<Option<Channel>, sqlx::Error> { + pub async fn get(&self, channel: &Id) -> Result<Option<Channel>, Error> { let mut tx = self.db.begin().await?; let channel = tx.channels().by_id(channel).await.optional()?; tx.commit().await?; - Ok(channel.iter().flat_map(History::events).collect()) + Ok(channel.as_ref().and_then(History::as_snapshot)) } pub async fn delete(&self, channel: &Id, deleted_at: &DateTime) -> Result<(), Error> { @@ -54,13 +58,16 @@ impl<'a> Channels<'a> { .by_id(channel) .await .not_found(|| Error::NotFound(channel.clone()))?; + channel + .as_snapshot() + .ok_or_else(|| Error::Deleted(channel.id().clone()))?; let mut events = Vec::new(); - let messages = tx.messages().in_channel(&channel, None).await?; + let messages = tx.messages().live(&channel).await?; for message in messages { let deleted = tx.sequence().next(deleted_at).await?; - let message = tx.messages().delete(message.id(), &deleted).await?; + let message = tx.messages().delete(&message, &deleted).await?; events.extend( message .events() @@ -70,7 +77,7 @@ impl<'a> Channels<'a> { } let deleted = tx.sequence().next(deleted_at).await?; - let channel = tx.channels().delete(channel.id(), &deleted).await?; + let channel = tx.channels().delete(&channel, &deleted).await?; events.extend( channel .events() @@ -85,7 +92,7 @@ impl<'a> Channels<'a> { Ok(()) } - pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { + pub async fn expire(&self, relative_to: &DateTime) -> Result<(), ExpireError> { // Somewhat arbitrarily, expire after 90 days. let expire_at = relative_to.to_owned() - TimeDelta::days(90); @@ -115,26 +122,80 @@ impl<'a> Channels<'a> { Ok(()) } + + pub async fn purge(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { + // Somewhat arbitrarily, purge after 7 days. + let purge_at = relative_to.to_owned() - TimeDelta::days(7); + + let mut tx = self.db.begin().await?; + tx.channels().purge(&purge_at).await?; + tx.commit().await?; + + Ok(()) + } + + pub async fn recanonicalize(&self) -> Result<(), sqlx::Error> { + let mut tx = self.db.begin().await?; + tx.channels().recanonicalize().await?; + tx.commit().await?; + + Ok(()) + } } #[derive(Debug, thiserror::Error)] pub enum CreateError { #[error("channel named {0} already exists")] - DuplicateName(String), + DuplicateName(Name), + #[error(transparent)] + Database(#[from] sqlx::Error), #[error(transparent)] - DatabaseError(#[from] sqlx::Error), + Name(#[from] name::Error), +} + +impl From<LoadError> for CreateError { + fn from(error: LoadError) -> Self { + match error { + LoadError::Database(error) => error.into(), + LoadError::Name(error) => error.into(), + } + } } #[derive(Debug, thiserror::Error)] pub enum Error { #[error("channel {0} not found")] NotFound(Id), + #[error("channel {0} deleted")] + Deleted(Id), #[error(transparent)] - DatabaseError(#[from] sqlx::Error), + Database(#[from] sqlx::Error), + #[error(transparent)] + Name(#[from] name::Error), +} + +impl From<LoadError> for Error { + fn from(error: LoadError) -> Self { + match error { + LoadError::Database(error) => error.into(), + LoadError::Name(error) => error.into(), + } + } } #[derive(Debug, thiserror::Error)] -pub enum InternalError { +pub enum ExpireError { #[error(transparent)] - DatabaseError(#[from] sqlx::Error), + Database(#[from] sqlx::Error), + #[error(transparent)] + Name(#[from] name::Error), +} + +impl From<LoadError> for ExpireError { + fn from(error: LoadError) -> Self { + match error { + LoadError::Database(error) => error.into(), + LoadError::Name(error) => error.into(), + } + } } diff --git a/src/channel/history.rs b/src/channel/history.rs index 78b3437..4b9fcc7 100644 --- a/src/channel/history.rs +++ b/src/channel/history.rs @@ -31,6 +31,11 @@ impl History { .filter(Sequence::up_to(resume_point.into())) .collect() } + + // Snapshot of this channel as of all events recorded in this history. + pub fn as_snapshot(&self) -> Option<Channel> { + self.events().collect() + } } // Event factories diff --git a/src/channel/repo.rs b/src/channel/repo.rs index 2f57581..a49db52 100644 --- a/src/channel/repo.rs +++ b/src/channel/repo.rs @@ -1,9 +1,12 @@ +use futures::stream::{StreamExt as _, TryStreamExt as _}; use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; use crate::{ channel::{Channel, History, Id}, clock::DateTime, + db::NotFound, event::{Instant, ResumePoint, Sequence}, + name::{self, Name}, }; pub trait Provider { @@ -19,130 +22,162 @@ impl<'c> Provider for Transaction<'c, Sqlite> { pub struct Channels<'t>(&'t mut SqliteConnection); impl<'c> Channels<'c> { - pub async fn create(&mut self, name: &str, created: &Instant) -> Result<History, sqlx::Error> { + pub async fn create(&mut self, name: &Name, created: &Instant) -> Result<History, sqlx::Error> { let id = Id::generate(); - let channel = sqlx::query!( + let name = name.clone(); + let display_name = name.display(); + let canonical_name = name.canonical(); + let created = *created; + + sqlx::query!( r#" insert - into channel (id, name, created_at, created_sequence) - values ($1, $2, $3, $4) - returning - id as "id: Id", - name, - created_at as "created_at: DateTime", - created_sequence as "created_sequence: Sequence" + into channel (id, created_at, created_sequence) + values ($1, $2, $3) "#, id, - name, created.at, created.sequence, ) - .map(|row| History { + .execute(&mut *self.0) + .await?; + + sqlx::query!( + r#" + insert into channel_name (id, display_name, canonical_name) + values ($1, $2, $3) + "#, + id, + display_name, + canonical_name, + ) + .execute(&mut *self.0) + .await?; + + let channel = History { channel: Channel { - id: row.id, - name: row.name, - }, - created: Instant { - at: row.created_at, - sequence: row.created_sequence, + id, + name: name.clone(), + deleted_at: None, }, + created, deleted: None, - }) - .fetch_one(&mut *self.0) - .await?; + }; Ok(channel) } - pub async fn by_id(&mut self, channel: &Id) -> Result<History, sqlx::Error> { + pub async fn by_id(&mut self, channel: &Id) -> Result<History, LoadError> { let channel = sqlx::query!( r#" select id as "id: Id", - name, - created_at as "created_at: DateTime", - created_sequence as "created_sequence: Sequence" + name.display_name as "display_name?: String", + name.canonical_name as "canonical_name?: String", + channel.created_at as "created_at: DateTime", + channel.created_sequence as "created_sequence: Sequence", + deleted.deleted_at as "deleted_at?: DateTime", + deleted.deleted_sequence as "deleted_sequence?: Sequence" from channel + left join channel_name as name + using (id) + left join channel_deleted as deleted + using (id) where id = $1 "#, channel, ) - .map(|row| History { - channel: Channel { - id: row.id, - name: row.name, - }, - created: Instant { - at: row.created_at, - sequence: row.created_sequence, - }, - deleted: None, + .map(|row| { + Ok::<_, name::Error>(History { + channel: Channel { + id: row.id, + name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(), + deleted_at: row.deleted_at, + }, + created: Instant::new(row.created_at, row.created_sequence), + deleted: Instant::optional(row.deleted_at, row.deleted_sequence), + }) }) .fetch_one(&mut *self.0) - .await?; + .await??; Ok(channel) } - pub async fn all(&mut self, resume_at: ResumePoint) -> Result<Vec<History>, sqlx::Error> { + pub async fn all(&mut self, resume_at: Sequence) -> Result<Vec<History>, LoadError> { let channels = sqlx::query!( r#" select id as "id: Id", - name, - created_at as "created_at: DateTime", - created_sequence as "created_sequence: Sequence" + name.display_name as "display_name?: String", + name.canonical_name as "canonical_name?: String", + channel.created_at as "created_at: DateTime", + channel.created_sequence as "created_sequence: Sequence", + deleted.deleted_at as "deleted_at?: DateTime", + deleted.deleted_sequence as "deleted_sequence?: Sequence" from channel - where coalesce(created_sequence <= $1, true) - order by channel.name + left join channel_name as name + using (id) + left join channel_deleted as deleted + using (id) + where channel.created_sequence <= $1 + order by name.canonical_name "#, resume_at, ) - .map(|row| History { - channel: Channel { - id: row.id, - name: row.name, - }, - created: Instant { - at: row.created_at, - sequence: row.created_sequence, - }, - deleted: None, + .map(|row| { + Ok::<_, name::Error>(History { + channel: Channel { + id: row.id, + name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(), + deleted_at: row.deleted_at, + }, + created: Instant::new(row.created_at, row.created_sequence), + deleted: Instant::optional(row.deleted_at, row.deleted_sequence), + }) }) - .fetch_all(&mut *self.0) + .fetch(&mut *self.0) + .map(|res| Ok::<_, LoadError>(res??)) + .try_collect() .await?; Ok(channels) } - pub async fn replay( - &mut self, - resume_at: Option<Sequence>, - ) -> Result<Vec<History>, sqlx::Error> { + pub async fn replay(&mut self, resume_at: ResumePoint) -> Result<Vec<History>, LoadError> { let channels = sqlx::query!( r#" select id as "id: Id", - name, - created_at as "created_at: DateTime", - created_sequence as "created_sequence: Sequence" + name.display_name as "display_name: String", + name.canonical_name as "canonical_name: String", + channel.created_at as "created_at: DateTime", + channel.created_sequence as "created_sequence: Sequence", + deleted.deleted_at as "deleted_at?: DateTime", + deleted.deleted_sequence as "deleted_sequence?: Sequence" from channel - where coalesce(created_sequence > $1, true) + left join channel_name as name + using (id) + left join channel_deleted as deleted + using (id) + where coalesce(channel.created_sequence > $1, true) "#, resume_at, ) - .map(|row| History { - channel: Channel { - id: row.id, - name: row.name, - }, - created: Instant { - at: row.created_at, - sequence: row.created_sequence, - }, - deleted: None, + .map(|row| { + Ok::<_, name::Error>(History { + channel: Channel { + id: row.id, + name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(), + deleted_at: row.deleted_at, + }, + created: Instant::new(row.created_at, row.created_sequence), + deleted: Instant::optional(row.deleted_at, row.deleted_sequence), + }) }) - .fetch_all(&mut *self.0) + .fetch(&mut *self.0) + .map(|res| Ok::<_, LoadError>(res??)) + .try_collect() .await?; Ok(channels) @@ -150,53 +185,171 @@ impl<'c> Channels<'c> { pub async fn delete( &mut self, - channel: &Id, + channel: &History, deleted: &Instant, - ) -> Result<History, sqlx::Error> { - let channel = sqlx::query!( + ) -> Result<History, LoadError> { + let id = channel.id(); + sqlx::query!( r#" - delete from channel + insert into channel_deleted (id, deleted_at, deleted_sequence) + values ($1, $2, $3) + "#, + id, + deleted.at, + deleted.sequence, + ) + .execute(&mut *self.0) + .await?; + + // Small social responsibility hack here: when a channel is deleted, its name is + // retconned to have been the empty string. Someone reading the event stream + // afterwards, or looking at channels via the API, cannot retrieve the + // "deleted" channel's information by ignoring the deletion event. + // + // This also avoids the need for a separate name reservation table to ensure + // that live channels have unique names, since the `channel` table's name field + // is unique over non-null values. + sqlx::query!( + r#" + delete from channel_name where id = $1 - returning - id as "id: Id", - name, - created_at as "created_at: DateTime", - created_sequence as "created_sequence: Sequence" "#, - channel, + id, ) - .map(|row| History { - channel: Channel { - id: row.id, - name: row.name, - }, - created: Instant { - at: row.created_at, - sequence: row.created_sequence, - }, - deleted: Some(*deleted), - }) - .fetch_one(&mut *self.0) + .execute(&mut *self.0) .await?; + let channel = self.by_id(id).await?; + Ok(channel) } - pub async fn expired(&mut self, expired_at: &DateTime) -> Result<Vec<Id>, sqlx::Error> { + pub async fn purge(&mut self, purge_at: &DateTime) -> Result<(), sqlx::Error> { let channels = sqlx::query_scalar!( r#" + with has_messages as ( + select channel + from message + group by channel + ) + delete from channel_deleted + where deleted_at < $1 + and id not in has_messages + returning id as "id: Id" + "#, + purge_at, + ) + .fetch_all(&mut *self.0) + .await?; + + for channel in channels { + // Wanted: a way to batch these up into one query. + sqlx::query!( + r#" + delete from channel + where id = $1 + "#, + channel, + ) + .execute(&mut *self.0) + .await?; + } + + Ok(()) + } + + pub async fn expired(&mut self, expired_at: &DateTime) -> Result<Vec<History>, LoadError> { + let channels = sqlx::query!( + r#" select - channel.id as "id: Id" + channel.id as "id: Id", + name.display_name as "display_name?: String", + name.canonical_name as "canonical_name?: String", + channel.created_at as "created_at: DateTime", + channel.created_sequence as "created_sequence: Sequence", + deleted.deleted_at as "deleted_at?: DateTime", + deleted.deleted_sequence as "deleted_sequence?: Sequence" from channel - left join message - where created_at < $1 + left join channel_name as name + using (id) + left join channel_deleted as deleted + using (id) + left join message + on channel.id = message.channel + where channel.created_at < $1 and message.id is null + and deleted.id is null "#, expired_at, ) - .fetch_all(&mut *self.0) + .map(|row| { + Ok::<_, name::Error>(History { + channel: Channel { + id: row.id, + name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(), + deleted_at: row.deleted_at, + }, + created: Instant::new(row.created_at, row.created_sequence), + deleted: Instant::optional(row.deleted_at, row.deleted_sequence), + }) + }) + .fetch(&mut *self.0) + .map(|res| Ok::<_, LoadError>(res??)) + .try_collect() .await?; Ok(channels) } + + pub async fn recanonicalize(&mut self) -> Result<(), sqlx::Error> { + let channels = sqlx::query!( + r#" + select + id as "id: Id", + display_name as "display_name: String" + from channel_name + "#, + ) + .fetch_all(&mut *self.0) + .await?; + + for channel in channels { + let name = Name::from(channel.display_name); + let canonical_name = name.canonical(); + + sqlx::query!( + r#" + update channel_name + set canonical_name = $1 + where id = $2 + "#, + canonical_name, + channel.id, + ) + .execute(&mut *self.0) + .await?; + } + + Ok(()) + } +} + +#[derive(Debug, thiserror::Error)] +#[error(transparent)] +pub enum LoadError { + Database(#[from] sqlx::Error), + Name(#[from] name::Error), +} + +impl<T> NotFound for Result<T, LoadError> { + type Ok = T; + type Error = LoadError; + + fn optional(self) -> Result<Option<T>, LoadError> { + match self { + Ok(value) => Ok(Some(value)), + Err(LoadError::Database(sqlx::Error::RowNotFound)) => Ok(None), + Err(other) => Err(other), + } + } } diff --git a/src/channel/routes.rs b/src/channel/routes.rs deleted file mode 100644 index eaf7962..0000000 --- a/src/channel/routes.rs +++ /dev/null @@ -1,121 +0,0 @@ -use axum::{ - extract::{Json, Path, State}, - http::StatusCode, - response::{IntoResponse, Response}, - routing::{delete, post}, - Router, -}; - -use super::{app, Channel, Id}; -use crate::{ - app::App, - clock::RequestedAt, - error::{Internal, NotFound}, - login::Login, - message::app::SendError, -}; - -#[cfg(test)] -mod test; - -pub fn router() -> Router<App> { - Router::new() - .route("/api/channels", post(on_create)) - .route("/api/channels/:channel", post(on_send)) - .route("/api/channels/:channel", delete(on_delete)) -} - -#[derive(Clone, serde::Deserialize)] -struct CreateRequest { - name: String, -} - -async fn on_create( - State(app): State<App>, - _: Login, // requires auth, but doesn't actually care who you are - RequestedAt(created_at): RequestedAt, - Json(form): Json<CreateRequest>, -) -> Result<Json<Channel>, CreateError> { - let channel = app - .channels() - .create(&form.name, &created_at) - .await - .map_err(CreateError)?; - - Ok(Json(channel)) -} - -#[derive(Debug)] -struct CreateError(app::CreateError); - -impl IntoResponse for CreateError { - fn into_response(self) -> Response { - let Self(error) = self; - match error { - duplicate @ app::CreateError::DuplicateName(_) => { - (StatusCode::CONFLICT, duplicate.to_string()).into_response() - } - other => Internal::from(other).into_response(), - } - } -} - -#[derive(Clone, serde::Deserialize)] -struct SendRequest { - body: String, -} - -async fn on_send( - State(app): State<App>, - Path(channel): Path<Id>, - RequestedAt(sent_at): RequestedAt, - login: Login, - Json(request): Json<SendRequest>, -) -> Result<StatusCode, SendErrorResponse> { - app.messages() - .send(&channel, &login, &sent_at, &request.body) - .await?; - - Ok(StatusCode::ACCEPTED) -} - -#[derive(Debug, thiserror::Error)] -#[error(transparent)] -struct SendErrorResponse(#[from] SendError); - -impl IntoResponse for SendErrorResponse { - fn into_response(self) -> Response { - let Self(error) = self; - match error { - not_found @ SendError::ChannelNotFound(_) => NotFound(not_found).into_response(), - other => Internal::from(other).into_response(), - } - } -} - -async fn on_delete( - State(app): State<App>, - Path(channel): Path<Id>, - RequestedAt(deleted_at): RequestedAt, - _: Login, -) -> Result<StatusCode, ErrorResponse> { - app.channels().delete(&channel, &deleted_at).await?; - - Ok(StatusCode::ACCEPTED) -} - -#[derive(Debug, thiserror::Error)] -#[error(transparent)] -struct ErrorResponse(#[from] app::Error); - -impl IntoResponse for ErrorResponse { - fn into_response(self) -> Response { - let Self(error) = self; - match error { - not_found @ app::Error::NotFound(_) => { - (StatusCode::NOT_FOUND, not_found.to_string()).into_response() - } - other => Internal::from(other).into_response(), - } - } -} diff --git a/src/channel/routes/channel/delete.rs b/src/channel/routes/channel/delete.rs new file mode 100644 index 0000000..91eb506 --- /dev/null +++ b/src/channel/routes/channel/delete.rs @@ -0,0 +1,39 @@ +use axum::{ + extract::{Path, State}, + http::StatusCode, + response::{IntoResponse, Response}, +}; + +use crate::{ + app::App, + channel::app, + clock::RequestedAt, + error::{Internal, NotFound}, + token::extract::Identity, +}; + +pub async fn handler( + State(app): State<App>, + Path(channel): Path<super::PathInfo>, + RequestedAt(deleted_at): RequestedAt, + _: Identity, +) -> Result<StatusCode, Error> { + app.channels().delete(&channel, &deleted_at).await?; + + Ok(StatusCode::ACCEPTED) +} + +#[derive(Debug, thiserror::Error)] +#[error(transparent)] +pub struct Error(#[from] pub app::Error); + +impl IntoResponse for Error { + fn into_response(self) -> Response { + let Self(error) = self; + #[allow(clippy::match_wildcard_for_single_variants)] + match error { + app::Error::NotFound(_) | app::Error::Deleted(_) => NotFound(error).into_response(), + other => Internal::from(other).into_response(), + } + } +} diff --git a/src/channel/routes/channel/mod.rs b/src/channel/routes/channel/mod.rs new file mode 100644 index 0000000..31a9142 --- /dev/null +++ b/src/channel/routes/channel/mod.rs @@ -0,0 +1,9 @@ +use crate::channel::Id; + +pub mod delete; +pub mod post; + +#[cfg(test)] +mod test; + +type PathInfo = Id; diff --git a/src/channel/routes/channel/post.rs b/src/channel/routes/channel/post.rs new file mode 100644 index 0000000..b51e691 --- /dev/null +++ b/src/channel/routes/channel/post.rs @@ -0,0 +1,58 @@ +use axum::{ + extract::{Json, Path, State}, + http::StatusCode, + response::{self, IntoResponse}, +}; + +use crate::{ + app::App, + clock::RequestedAt, + error::{Internal, NotFound}, + message::{app::SendError, Body, Message}, + token::extract::Identity, +}; + +pub async fn handler( + State(app): State<App>, + Path(channel): Path<super::PathInfo>, + RequestedAt(sent_at): RequestedAt, + identity: Identity, + Json(request): Json<Request>, +) -> Result<Response, Error> { + let message = app + .messages() + .send(&channel, &identity.login, &sent_at, &request.body) + .await?; + + Ok(Response(message)) +} + +#[derive(serde::Deserialize)] +pub struct Request { + pub body: Body, +} + +#[derive(Debug)] +pub struct Response(pub Message); + +impl IntoResponse for Response { + fn into_response(self) -> response::Response { + let Self(message) = self; + (StatusCode::ACCEPTED, Json(message)).into_response() + } +} + +#[derive(Debug, thiserror::Error)] +#[error(transparent)] +pub struct Error(#[from] pub SendError); + +impl IntoResponse for Error { + fn into_response(self) -> response::Response { + let Self(error) = self; + #[allow(clippy::match_wildcard_for_single_variants)] + match error { + SendError::ChannelNotFound(_) => NotFound(error).into_response(), + other => Internal::from(other).into_response(), + } + } +} diff --git a/src/channel/routes/channel/test/delete.rs b/src/channel/routes/channel/test/delete.rs new file mode 100644 index 0000000..e9af12f --- /dev/null +++ b/src/channel/routes/channel/test/delete.rs @@ -0,0 +1,154 @@ +use axum::{ + extract::{Path, State}, + http::StatusCode, +}; + +use crate::{ + channel::{app, routes::channel::delete}, + test::fixtures, +}; + +#[tokio::test] +pub async fn delete_channel() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; + + // Send the request + + let deleter = fixtures::identity::create(&app, &fixtures::now()).await; + let response = delete::handler( + State(app.clone()), + Path(channel.id.clone()), + fixtures::now(), + deleter, + ) + .await + .expect("deleting a valid channel succeeds"); + + // Verify the response + + assert_eq!(response, StatusCode::ACCEPTED); + + // Verify the semantics + + let snapshot = app.boot().snapshot().await.expect("boot always succeeds"); + assert!(!snapshot.channels.contains(&channel)); +} + +#[tokio::test] +pub async fn delete_invalid_channel_id() { + // Set up the environment + + let app = fixtures::scratch_app().await; + + // Send the request + + let deleter = fixtures::identity::create(&app, &fixtures::now()).await; + let channel = fixtures::channel::fictitious(); + let delete::Error(error) = delete::handler( + State(app.clone()), + Path(channel.clone()), + fixtures::now(), + deleter, + ) + .await + .expect_err("deleting a nonexistent channel fails"); + + // Verify the response + + assert!(matches!(error, app::Error::NotFound(id) if id == channel)); +} + +#[tokio::test] +pub async fn delete_deleted() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; + + app.channels() + .delete(&channel.id, &fixtures::now()) + .await + .expect("deleting a recently-sent channel succeeds"); + + // Send the request + + let deleter = fixtures::identity::create(&app, &fixtures::now()).await; + let delete::Error(error) = delete::handler( + State(app.clone()), + Path(channel.id.clone()), + fixtures::now(), + deleter, + ) + .await + .expect_err("deleting a deleted channel fails"); + + // Verify the response + + assert!(matches!(error, app::Error::Deleted(id) if id == channel.id)); +} + +#[tokio::test] +pub async fn delete_expired() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; + + app.channels() + .expire(&fixtures::now()) + .await + .expect("expiring channels always succeeds"); + + // Send the request + + let deleter = fixtures::identity::create(&app, &fixtures::now()).await; + let delete::Error(error) = delete::handler( + State(app.clone()), + Path(channel.id.clone()), + fixtures::now(), + deleter, + ) + .await + .expect_err("deleting an expired channel fails"); + + // Verify the response + + assert!(matches!(error, app::Error::Deleted(id) if id == channel.id)); +} + +#[tokio::test] +pub async fn delete_purged() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; + + app.channels() + .expire(&fixtures::old()) + .await + .expect("expiring channels always succeeds"); + + app.channels() + .purge(&fixtures::now()) + .await + .expect("purging channels always succeeds"); + + // Send the request + + let deleter = fixtures::identity::create(&app, &fixtures::now()).await; + let delete::Error(error) = delete::handler( + State(app.clone()), + Path(channel.id.clone()), + fixtures::now(), + deleter, + ) + .await + .expect_err("deleting a purged channel fails"); + + // Verify the response + + assert!(matches!(error, app::Error::NotFound(id) if id == channel.id)); +} diff --git a/src/channel/routes/channel/test/mod.rs b/src/channel/routes/channel/test/mod.rs new file mode 100644 index 0000000..78bf86e --- /dev/null +++ b/src/channel/routes/channel/test/mod.rs @@ -0,0 +1,2 @@ +mod delete; +mod post; diff --git a/src/channel/routes/test/on_send.rs b/src/channel/routes/channel/test/post.rs index 293cc56..67e7d36 100644 --- a/src/channel/routes/test/on_send.rs +++ b/src/channel/routes/channel/test/post.rs @@ -2,9 +2,8 @@ use axum::extract::{Json, Path, State}; use futures::stream::StreamExt; use crate::{ - channel, - channel::routes, - event::{self, Sequenced}, + channel::{self, routes::channel::post}, + event::Sequenced, message::{self, app::SendError}, test::fixtures::{self, future::Immediately as _}, }; @@ -14,7 +13,7 @@ async fn messages_in_order() { // Set up the environment let app = fixtures::scratch_app().await; - let sender = fixtures::login::create(&app, &fixtures::now()).await; + let sender = fixtures::identity::create(&app, &fixtures::now()).await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; // Call the endpoint (twice) @@ -25,17 +24,17 @@ async fn messages_in_order() { ]; for (sent_at, body) in &requests { - let request = routes::SendRequest { body: body.clone() }; + let request = post::Request { body: body.clone() }; - routes::on_send( + let _ = post::handler( State(app.clone()), Path(channel.id.clone()), sent_at.clone(), sender.clone(), - Json(request.clone()), + Json(request), ) .await - .expect("sending to a valid channel"); + .expect("sending to a valid channel succeeds"); } // Verify the semantics @@ -44,8 +43,8 @@ async fn messages_in_order() { .events() .subscribe(None) .await - .expect("subscribing to a valid channel") - .filter(fixtures::filter::messages()) + .expect("subscribing to a valid channel succeeds") + .filter_map(fixtures::message::events) .take(requests.len()); let events = events.collect::<Vec<_>>().immediately().await; @@ -54,8 +53,8 @@ async fn messages_in_order() { assert_eq!(*sent_at, event.at()); assert!(matches!( event, - event::Event::Message(message::Event::Sent(event)) - if event.message.sender == sender.id + message::Event::Sent(event) + if event.message.sender == sender.login.id && event.message.body == message )); } @@ -66,24 +65,62 @@ async fn nonexistent_channel() { // Set up the environment let app = fixtures::scratch_app().await; - let login = fixtures::login::create(&app, &fixtures::now()).await; + let sender = fixtures::identity::create(&app, &fixtures::now()).await; // Call the endpoint let sent_at = fixtures::now(); let channel = channel::Id::generate(); - let request = routes::SendRequest { + let request = post::Request { body: fixtures::message::propose(), }; - let routes::SendErrorResponse(error) = routes::on_send( + let post::Error(error) = post::handler( State(app), Path(channel.clone()), sent_at, - login, + sender, Json(request), ) .await - .expect_err("sending to a nonexistent channel"); + .expect_err("sending to a nonexistent channel fails"); + + // Verify the structure of the response + + assert!(matches!( + error, + SendError::ChannelNotFound(error_channel) if channel == error_channel + )); +} + +#[tokio::test] +async fn deleted_channel() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let sender = fixtures::identity::create(&app, &fixtures::now()).await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; + + app.channels() + .delete(&channel.id, &fixtures::now()) + .await + .expect("deleting a new channel succeeds"); + + // Call the endpoint + + let sent_at = fixtures::now(); + let channel = channel::Id::generate(); + let request = post::Request { + body: fixtures::message::propose(), + }; + let post::Error(error) = post::handler( + State(app), + Path(channel.clone()), + sent_at, + sender, + Json(request), + ) + .await + .expect_err("sending to a deleted channel fails"); // Verify the structure of the response diff --git a/src/channel/routes/mod.rs b/src/channel/routes/mod.rs new file mode 100644 index 0000000..696bd72 --- /dev/null +++ b/src/channel/routes/mod.rs @@ -0,0 +1,19 @@ +use axum::{ + routing::{delete, post}, + Router, +}; + +use crate::app::App; + +mod channel; +mod post; + +#[cfg(test)] +mod test; + +pub fn router() -> Router<App> { + Router::new() + .route("/api/channels", post(post::handler)) + .route("/api/channels/:channel", post(channel::post::handler)) + .route("/api/channels/:channel", delete(channel::delete::handler)) +} diff --git a/src/channel/routes/post.rs b/src/channel/routes/post.rs new file mode 100644 index 0000000..810445c --- /dev/null +++ b/src/channel/routes/post.rs @@ -0,0 +1,60 @@ +use axum::{ + extract::{Json, State}, + http::StatusCode, + response::{self, IntoResponse}, +}; + +use crate::{ + app::App, + channel::{app, Channel}, + clock::RequestedAt, + error::Internal, + name::Name, + token::extract::Identity, +}; + +pub async fn handler( + State(app): State<App>, + _: Identity, // requires auth, but doesn't actually care who you are + RequestedAt(created_at): RequestedAt, + Json(request): Json<Request>, +) -> Result<Response, Error> { + let channel = app + .channels() + .create(&request.name, &created_at) + .await + .map_err(Error)?; + + Ok(Response(channel)) +} + +#[derive(serde::Deserialize)] +pub struct Request { + pub name: Name, +} + +#[derive(Debug)] +pub struct Response(pub Channel); + +impl IntoResponse for Response { + fn into_response(self) -> response::Response { + let Self(channel) = self; + (StatusCode::ACCEPTED, Json(channel)).into_response() + } +} + +#[derive(Debug)] +pub struct Error(pub app::CreateError); + +impl IntoResponse for Error { + fn into_response(self) -> response::Response { + let Self(error) = self; + #[allow(clippy::match_wildcard_for_single_variants)] + match error { + app::CreateError::DuplicateName(_) => { + (StatusCode::CONFLICT, error.to_string()).into_response() + } + other => Internal::from(other).into_response(), + } + } +} diff --git a/src/channel/routes/test.rs b/src/channel/routes/test.rs new file mode 100644 index 0000000..216eba1 --- /dev/null +++ b/src/channel/routes/test.rs @@ -0,0 +1,221 @@ +use std::future; + +use axum::extract::{Json, State}; +use futures::stream::StreamExt as _; + +use super::post; +use crate::{ + channel::app, + name::Name, + test::fixtures::{self, future::Immediately as _}, +}; + +#[tokio::test] +async fn new_channel() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let creator = fixtures::identity::create(&app, &fixtures::now()).await; + + // Call the endpoint + + let name = fixtures::channel::propose(); + let request = post::Request { name: name.clone() }; + let post::Response(response) = + post::handler(State(app.clone()), creator, fixtures::now(), Json(request)) + .await + .expect("creating a channel in an empty app succeeds"); + + // Verify the structure of the response + + assert_eq!(name, response.name); + + // Verify the semantics + + let snapshot = app.boot().snapshot().await.expect("boot always succeeds"); + assert!(snapshot.channels.iter().any(|channel| channel == &response)); + + let channel = app + .channels() + .get(&response.id) + .await + .expect("searching for channels by ID never fails") + .expect("the newly-created channel exists"); + assert_eq!(response, channel); + + let mut events = app + .events() + .subscribe(None) + .await + .expect("subscribing never fails") + .filter_map(fixtures::channel::events) + .filter_map(fixtures::channel::created) + .filter(|event| future::ready(event.channel == response)); + + let event = events + .next() + .immediately() + .await + .expect("creation event published"); + + assert_eq!(event.channel, response); +} + +#[tokio::test] +async fn duplicate_name() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let creator = fixtures::identity::create(&app, &fixtures::now()).await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; + + // Call the endpoint + + let request = post::Request { + name: channel.name.clone(), + }; + let post::Error(error) = + post::handler(State(app.clone()), creator, fixtures::now(), Json(request)) + .await + .expect_err("duplicate channel name should fail the request"); + + // Verify the structure of the response + + assert!(matches!( + error, + app::CreateError::DuplicateName(name) if channel.name == name + )); +} + +#[tokio::test] +async fn conflicting_canonical_name() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let creator = fixtures::identity::create(&app, &fixtures::now()).await; + + let existing_name = Name::from("rijksmuseum"); + app.channels() + .create(&existing_name, &fixtures::now()) + .await + .expect("creating a channel in an empty environment succeeds"); + + let conflicting_name = Name::from("r\u{0133}ksmuseum"); + + // Call the endpoint + + let request = post::Request { + name: conflicting_name.clone(), + }; + let post::Error(error) = + post::handler(State(app.clone()), creator, fixtures::now(), Json(request)) + .await + .expect_err("duplicate channel name should fail the request"); + + // Verify the structure of the response + + assert!(matches!( + error, + app::CreateError::DuplicateName(name) if conflicting_name == name + )); +} + +#[tokio::test] +async fn name_reusable_after_delete() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let creator = fixtures::identity::create(&app, &fixtures::now()).await; + let name = fixtures::channel::propose(); + + // Call the endpoint (first time) + + let request = post::Request { name: name.clone() }; + let post::Response(response) = post::handler( + State(app.clone()), + creator.clone(), + fixtures::now(), + Json(request), + ) + .await + .expect("new channel in an empty app"); + + // Delete the channel + + app.channels() + .delete(&response.id, &fixtures::now()) + .await + .expect("deleting a newly-created channel succeeds"); + + // Call the endpoint (second time) + + let request = post::Request { name: name.clone() }; + let post::Response(response) = + post::handler(State(app.clone()), creator, fixtures::now(), Json(request)) + .await + .expect("creation succeeds after original channel deleted"); + + // Verify the structure of the response + + assert_eq!(name, response.name); + + // Verify the semantics + + let channel = app + .channels() + .get(&response.id) + .await + .expect("searching for channels by ID never fails") + .expect("the newly-created channel exists"); + assert_eq!(response, channel); +} + +#[tokio::test] +async fn name_reusable_after_expiry() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let creator = fixtures::identity::create(&app, &fixtures::ancient()).await; + let name = fixtures::channel::propose(); + + // Call the endpoint (first time) + + let request = post::Request { name: name.clone() }; + let post::Response(_) = post::handler( + State(app.clone()), + creator.clone(), + fixtures::ancient(), + Json(request), + ) + .await + .expect("new channel in an empty app"); + + // Delete the channel + + app.channels() + .expire(&fixtures::now()) + .await + .expect("expiry always succeeds"); + + // Call the endpoint (second time) + + let request = post::Request { name: name.clone() }; + let post::Response(response) = + post::handler(State(app.clone()), creator, fixtures::now(), Json(request)) + .await + .expect("creation succeeds after original channel expired"); + + // Verify the structure of the response + + assert_eq!(name, response.name); + + // Verify the semantics + + let channel = app + .channels() + .get(&response.id) + .await + .expect("searching for channels by ID never fails") + .expect("the newly-created channel exists"); + assert_eq!(response, channel); +} diff --git a/src/channel/routes/test/mod.rs b/src/channel/routes/test/mod.rs deleted file mode 100644 index 3e5aa17..0000000 --- a/src/channel/routes/test/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -mod on_create; -mod on_send; diff --git a/src/channel/routes/test/on_create.rs b/src/channel/routes/test/on_create.rs deleted file mode 100644 index eeecc7f..0000000 --- a/src/channel/routes/test/on_create.rs +++ /dev/null @@ -1,88 +0,0 @@ -use axum::extract::{Json, State}; -use futures::stream::StreamExt as _; - -use crate::{ - channel::{self, app, routes}, - event, - test::fixtures::{self, future::Immediately as _}, -}; - -#[tokio::test] -async fn new_channel() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let creator = fixtures::login::create(&app, &fixtures::now()).await; - - // Call the endpoint - - let name = fixtures::channel::propose(); - let request = routes::CreateRequest { name }; - let Json(response_channel) = routes::on_create( - State(app.clone()), - creator, - fixtures::now(), - Json(request.clone()), - ) - .await - .expect("new channel in an empty app"); - - // Verify the structure of the response - - assert_eq!(request.name, response_channel.name); - - // Verify the semantics - - let snapshot = app.boot().snapshot().await.expect("boot always succeeds"); - assert!(snapshot - .channels - .iter() - .any(|channel| channel.name == response_channel.name && channel.id == response_channel.id)); - - let mut events = app - .events() - .subscribe(None) - .await - .expect("subscribing never fails") - .filter(fixtures::filter::created()); - - let event = events - .next() - .immediately() - .await - .expect("creation event published"); - - assert!(matches!( - event, - event::Event::Channel(channel::Event::Created(event)) - if event.channel == response_channel - )); -} - -#[tokio::test] -async fn duplicate_name() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let creator = fixtures::login::create(&app, &fixtures::now()).await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; - - // Call the endpoint - - let request = routes::CreateRequest { name: channel.name }; - let routes::CreateError(error) = routes::on_create( - State(app.clone()), - creator, - fixtures::now(), - Json(request.clone()), - ) - .await - .expect_err("duplicate channel name"); - - // Verify the structure of the response - - assert!(matches!( - error, - app::CreateError::DuplicateName(name) if request.name == name - )); -} diff --git a/src/channel/snapshot.rs b/src/channel/snapshot.rs index d4d1d27..129c0d6 100644 --- a/src/channel/snapshot.rs +++ b/src/channel/snapshot.rs @@ -2,11 +2,14 @@ use super::{ event::{Created, Event}, Id, }; +use crate::{clock::DateTime, name::Name}; #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] pub struct Channel { pub id: Id, - pub name: String, + pub name: Name, + #[serde(skip_serializing_if = "Option::is_none")] + pub deleted_at: Option<DateTime>, } impl Channel { |
