diff options
Diffstat (limited to 'src/channel')
| -rw-r--r-- | src/channel/app.rs | 224 | ||||
| -rw-r--r-- | src/channel/event.rs | 46 | ||||
| -rw-r--r-- | src/channel/handlers/create/mod.rs | 67 | ||||
| -rw-r--r-- | src/channel/handlers/create/test.rs | 250 | ||||
| -rw-r--r-- | src/channel/handlers/delete/mod.rs | 59 | ||||
| -rw-r--r-- | src/channel/handlers/delete/test.rs | 184 | ||||
| -rw-r--r-- | src/channel/handlers/mod.rs | 9 | ||||
| -rw-r--r-- | src/channel/handlers/send/mod.rs | 63 | ||||
| -rw-r--r-- | src/channel/handlers/send/test.rs | 130 | ||||
| -rw-r--r-- | src/channel/history.rs | 69 | ||||
| -rw-r--r-- | src/channel/id.rs | 38 | ||||
| -rw-r--r-- | src/channel/mod.rs | 10 | ||||
| -rw-r--r-- | src/channel/repo.rs | 336 | ||||
| -rw-r--r-- | src/channel/snapshot.rs | 43 | ||||
| -rw-r--r-- | src/channel/validate.rs | 25 |
15 files changed, 0 insertions, 1553 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs deleted file mode 100644 index e3b169c..0000000 --- a/src/channel/app.rs +++ /dev/null @@ -1,224 +0,0 @@ -use chrono::TimeDelta; -use itertools::Itertools; -use sqlx::sqlite::SqlitePool; - -use super::{ - Channel, Id, - repo::{LoadError, Provider as _}, - validate, -}; -use crate::{ - clock::DateTime, - db::{Duplicate as _, NotFound as _}, - event::{Broadcaster, Event, Sequence, repo::Provider as _}, - message::{self, repo::Provider as _}, - name::{self, Name}, -}; - -pub struct Channels<'a> { - db: &'a SqlitePool, - events: &'a Broadcaster, -} - -impl<'a> Channels<'a> { - pub const fn new(db: &'a SqlitePool, events: &'a Broadcaster) -> Self { - Self { db, events } - } - - pub async fn create(&self, name: &Name, created_at: &DateTime) -> Result<Channel, CreateError> { - if !validate::name(name) { - return Err(CreateError::InvalidName(name.clone())); - } - - let mut tx = self.db.begin().await?; - let created = tx.sequence().next(created_at).await?; - let channel = tx - .channels() - .create(name, &created) - .await - .duplicate(|| CreateError::DuplicateName(name.clone()))?; - tx.commit().await?; - - self.events - .broadcast(channel.events().map(Event::from).collect::<Vec<_>>()); - - Ok(channel.as_created()) - } - - // This function is careless with respect to time, and gets you the channel as - // it exists in the specific moment when you call it. - pub async fn get(&self, channel: &Id) -> Result<Channel, Error> { - let to_not_found = || Error::NotFound(channel.clone()); - let to_deleted = || Error::Deleted(channel.clone()); - - let mut tx = self.db.begin().await?; - let channel = tx.channels().by_id(channel).await.not_found(to_not_found)?; - tx.commit().await?; - - channel.as_snapshot().ok_or_else(to_deleted) - } - - pub async fn delete(&self, channel: &Id, deleted_at: &DateTime) -> Result<(), DeleteError> { - let mut tx = self.db.begin().await?; - - let channel = tx - .channels() - .by_id(channel) - .await - .not_found(|| DeleteError::NotFound(channel.clone()))?; - channel - .as_snapshot() - .ok_or_else(|| DeleteError::Deleted(channel.id().clone()))?; - - let mut events = Vec::new(); - - let messages = tx.messages().live(&channel).await?; - let has_messages = messages - .iter() - .map(message::History::as_snapshot) - .any(|message| message.is_some()); - if has_messages { - return Err(DeleteError::NotEmpty(channel.id().clone())); - } - - let deleted = tx.sequence().next(deleted_at).await?; - let channel = tx.channels().delete(&channel, &deleted).await?; - events.extend( - channel - .events() - .filter(Sequence::start_from(deleted.sequence)) - .map(Event::from), - ); - - tx.commit().await?; - - self.events.broadcast(events); - - Ok(()) - } - - pub async fn expire(&self, relative_to: &DateTime) -> Result<(), ExpireError> { - // Somewhat arbitrarily, expire after 7 days. Active channels will not be - // expired until their messages expire. - let expire_at = relative_to.to_owned() - TimeDelta::days(7); - - let mut tx = self.db.begin().await?; - let expired = tx.channels().expired(&expire_at).await?; - - let mut events = Vec::with_capacity(expired.len()); - for channel in expired { - let deleted = tx.sequence().next(relative_to).await?; - let channel = tx.channels().delete(&channel, &deleted).await?; - events.push( - channel - .events() - .filter(Sequence::start_from(deleted.sequence)), - ); - } - - tx.commit().await?; - - self.events.broadcast( - events - .into_iter() - .kmerge_by(Sequence::merge) - .map(Event::from) - .collect::<Vec<_>>(), - ); - - Ok(()) - } - - pub async fn purge(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { - // Somewhat arbitrarily, purge after 6 hours. - let purge_at = relative_to.to_owned() - TimeDelta::hours(6); - - let mut tx = self.db.begin().await?; - tx.channels().purge(&purge_at).await?; - tx.commit().await?; - - Ok(()) - } -} - -#[derive(Debug, thiserror::Error)] -pub enum CreateError { - #[error("channel named {0} already exists")] - DuplicateName(Name), - #[error("invalid channel name: {0}")] - InvalidName(Name), - #[error(transparent)] - Database(#[from] sqlx::Error), - #[error(transparent)] - Name(#[from] name::Error), -} - -impl From<LoadError> for CreateError { - fn from(error: LoadError) -> Self { - match error { - LoadError::Database(error) => error.into(), - LoadError::Name(error) => error.into(), - } - } -} - -#[derive(Debug, thiserror::Error)] -pub enum Error { - #[error("channel {0} not found")] - NotFound(Id), - #[error("channel {0} deleted")] - Deleted(Id), - #[error(transparent)] - Database(#[from] sqlx::Error), - #[error(transparent)] - Name(#[from] name::Error), -} - -impl From<LoadError> for Error { - fn from(error: LoadError) -> Self { - match error { - LoadError::Database(error) => error.into(), - LoadError::Name(error) => error.into(), - } - } -} - -#[derive(Debug, thiserror::Error)] -pub enum DeleteError { - #[error("channel {0} not found")] - NotFound(Id), - #[error("channel {0} deleted")] - Deleted(Id), - #[error("channel {0} not empty")] - NotEmpty(Id), - #[error(transparent)] - Database(#[from] sqlx::Error), - #[error(transparent)] - Name(#[from] name::Error), -} - -impl From<LoadError> for DeleteError { - fn from(error: LoadError) -> Self { - match error { - LoadError::Database(error) => error.into(), - LoadError::Name(error) => error.into(), - } - } -} - -#[derive(Debug, thiserror::Error)] -pub enum ExpireError { - #[error(transparent)] - Database(#[from] sqlx::Error), - #[error(transparent)] - Name(#[from] name::Error), -} - -impl From<LoadError> for ExpireError { - fn from(error: LoadError) -> Self { - match error { - LoadError::Database(error) => error.into(), - LoadError::Name(error) => error.into(), - } - } -} diff --git a/src/channel/event.rs b/src/channel/event.rs deleted file mode 100644 index a5739f9..0000000 --- a/src/channel/event.rs +++ /dev/null @@ -1,46 +0,0 @@ -use super::Channel; -use crate::{ - channel, - event::{Instant, Sequenced}, -}; - -#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] -#[serde(tag = "event", rename_all = "snake_case")] -pub enum Event { - Created(Created), - Deleted(Deleted), -} - -impl Sequenced for Event { - fn instant(&self) -> Instant { - match self { - Self::Created(event) => event.channel.created, - Self::Deleted(event) => event.instant, - } - } -} - -#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] -pub struct Created { - #[serde(flatten)] - pub channel: Channel, -} - -impl From<Created> for Event { - fn from(event: Created) -> Self { - Self::Created(event) - } -} - -#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] -pub struct Deleted { - #[serde(flatten)] - pub instant: Instant, - pub id: channel::Id, -} - -impl From<Deleted> for Event { - fn from(event: Deleted) -> Self { - Self::Deleted(event) - } -} diff --git a/src/channel/handlers/create/mod.rs b/src/channel/handlers/create/mod.rs deleted file mode 100644 index 2c860fc..0000000 --- a/src/channel/handlers/create/mod.rs +++ /dev/null @@ -1,67 +0,0 @@ -use axum::{ - extract::{Json, State}, - http::StatusCode, - response::{self, IntoResponse}, -}; - -use crate::{ - app::App, - channel::{Channel, app}, - clock::RequestedAt, - error::Internal, - name::Name, - token::extract::Identity, -}; - -#[cfg(test)] -mod test; - -pub async fn handler( - State(app): State<App>, - _: Identity, // requires auth, but doesn't actually care who you are - RequestedAt(created_at): RequestedAt, - Json(request): Json<Request>, -) -> Result<Response, Error> { - let channel = app - .channels() - .create(&request.name, &created_at) - .await - .map_err(Error)?; - - Ok(Response(channel)) -} - -#[derive(serde::Deserialize)] -pub struct Request { - pub name: Name, -} - -#[derive(Debug)] -pub struct Response(pub Channel); - -impl IntoResponse for Response { - fn into_response(self) -> response::Response { - let Self(channel) = self; - (StatusCode::ACCEPTED, Json(channel)).into_response() - } -} - -#[derive(Debug)] -pub struct Error(pub app::CreateError); - -impl IntoResponse for Error { - fn into_response(self) -> response::Response { - let Self(error) = self; - match error { - app::CreateError::DuplicateName(_) => { - (StatusCode::CONFLICT, error.to_string()).into_response() - } - app::CreateError::InvalidName(_) => { - (StatusCode::BAD_REQUEST, error.to_string()).into_response() - } - app::CreateError::Name(_) | app::CreateError::Database(_) => { - Internal::from(error).into_response() - } - } - } -} diff --git a/src/channel/handlers/create/test.rs b/src/channel/handlers/create/test.rs deleted file mode 100644 index 31bb778..0000000 --- a/src/channel/handlers/create/test.rs +++ /dev/null @@ -1,250 +0,0 @@ -use std::future; - -use axum::extract::{Json, State}; -use futures::stream::StreamExt as _; -use itertools::Itertools; - -use crate::{ - channel::app, - name::Name, - test::fixtures::{self, future::Expect as _}, -}; - -#[tokio::test] -async fn new_channel() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let creator = fixtures::identity::create(&app, &fixtures::now()).await; - let resume_point = fixtures::boot::resume_point(&app).await; - - // Call the endpoint - - let name = fixtures::channel::propose(); - let request = super::Request { name: name.clone() }; - let super::Response(response) = - super::handler(State(app.clone()), creator, fixtures::now(), Json(request)) - .await - .expect("creating a channel in an empty app succeeds"); - - // Verify the structure of the response - - assert_eq!(name, response.name); - - // Verify the semantics - - let snapshot = app.boot().snapshot().await.expect("boot always succeeds"); - let created = snapshot - .events - .into_iter() - .filter_map(fixtures::event::channel) - .filter_map(fixtures::event::channel::created) - .exactly_one() - .expect("only one channel has been created"); - assert_eq!(response, created.channel); - - let channel = app - .channels() - .get(&response.id) - .await - .expect("the newly-created channel exists"); - assert_eq!(response, channel); - - let mut events = app - .events() - .subscribe(resume_point) - .await - .expect("subscribing never fails") - .filter_map(fixtures::event::stream::channel) - .filter_map(fixtures::event::stream::channel::created) - .filter(|event| future::ready(event.channel == response)); - - let event = events.next().expect_some("creation event published").await; - - assert_eq!(event.channel, response); -} - -#[tokio::test] -async fn duplicate_name() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let creator = fixtures::identity::create(&app, &fixtures::now()).await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; - - // Call the endpoint - - let request = super::Request { - name: channel.name.clone(), - }; - let super::Error(error) = - super::handler(State(app.clone()), creator, fixtures::now(), Json(request)) - .await - .expect_err("duplicate channel name should fail the request"); - - // Verify the structure of the response - - assert!(matches!( - error, - app::CreateError::DuplicateName(name) if channel.name == name - )); -} - -#[tokio::test] -async fn conflicting_canonical_name() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let creator = fixtures::identity::create(&app, &fixtures::now()).await; - - let existing_name = Name::from("rijksmuseum"); - app.channels() - .create(&existing_name, &fixtures::now()) - .await - .expect("creating a channel in an empty environment succeeds"); - - let conflicting_name = Name::from("r\u{0133}ksmuseum"); - - // Call the endpoint - - let request = super::Request { - name: conflicting_name.clone(), - }; - let super::Error(error) = - super::handler(State(app.clone()), creator, fixtures::now(), Json(request)) - .await - .expect_err("duplicate channel name should fail the request"); - - // Verify the structure of the response - - assert!(matches!( - error, - app::CreateError::DuplicateName(name) if conflicting_name == name - )); -} - -#[tokio::test] -async fn invalid_name() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let creator = fixtures::identity::create(&app, &fixtures::now()).await; - - // Call the endpoint - - let name = fixtures::channel::propose_invalid_name(); - let request = super::Request { name: name.clone() }; - let super::Error(error) = crate::channel::handlers::create::handler( - State(app.clone()), - creator, - fixtures::now(), - Json(request), - ) - .await - .expect_err("invalid channel name should fail the request"); - - // Verify the structure of the response - - assert!(matches!( - error, - app::CreateError::InvalidName(error_name) if name == error_name - )); -} - -#[tokio::test] -async fn name_reusable_after_delete() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let creator = fixtures::identity::create(&app, &fixtures::now()).await; - let name = fixtures::channel::propose(); - - // Call the endpoint (first time) - - let request = super::Request { name: name.clone() }; - let super::Response(response) = super::handler( - State(app.clone()), - creator.clone(), - fixtures::now(), - Json(request), - ) - .await - .expect("new channel in an empty app"); - - // Delete the channel - - app.channels() - .delete(&response.id, &fixtures::now()) - .await - .expect("deleting a newly-created channel succeeds"); - - // Call the endpoint (second time) - - let request = super::Request { name: name.clone() }; - let super::Response(response) = - super::handler(State(app.clone()), creator, fixtures::now(), Json(request)) - .await - .expect("creation succeeds after original channel deleted"); - - // Verify the structure of the response - - assert_eq!(name, response.name); - - // Verify the semantics - - let channel = app - .channels() - .get(&response.id) - .await - .expect("the newly-created channel exists"); - assert_eq!(response, channel); -} - -#[tokio::test] -async fn name_reusable_after_expiry() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let creator = fixtures::identity::create(&app, &fixtures::ancient()).await; - let name = fixtures::channel::propose(); - - // Call the endpoint (first time) - - let request = super::Request { name: name.clone() }; - let super::Response(_) = super::handler( - State(app.clone()), - creator.clone(), - fixtures::ancient(), - Json(request), - ) - .await - .expect("new channel in an empty app"); - - // Delete the channel - - app.channels() - .expire(&fixtures::now()) - .await - .expect("expiry always succeeds"); - - // Call the endpoint (second time) - - let request = super::Request { name: name.clone() }; - let super::Response(response) = - super::handler(State(app.clone()), creator, fixtures::now(), Json(request)) - .await - .expect("creation succeeds after original channel expired"); - - // Verify the structure of the response - - assert_eq!(name, response.name); - - // Verify the semantics - - let channel = app - .channels() - .get(&response.id) - .await - .expect("the newly-created channel exists"); - assert_eq!(response, channel); -} diff --git a/src/channel/handlers/delete/mod.rs b/src/channel/handlers/delete/mod.rs deleted file mode 100644 index b986bec..0000000 --- a/src/channel/handlers/delete/mod.rs +++ /dev/null @@ -1,59 +0,0 @@ -use axum::{ - extract::{Json, Path, State}, - http::StatusCode, - response::{self, IntoResponse}, -}; - -use crate::{ - app::App, - channel::{self, app, handlers::PathInfo}, - clock::RequestedAt, - error::{Internal, NotFound}, - token::extract::Identity, -}; - -#[cfg(test)] -mod test; - -pub async fn handler( - State(app): State<App>, - Path(channel): Path<PathInfo>, - RequestedAt(deleted_at): RequestedAt, - _: Identity, -) -> Result<Response, Error> { - app.channels().delete(&channel, &deleted_at).await?; - - Ok(Response { id: channel }) -} - -#[derive(Debug, serde::Serialize)] -pub struct Response { - pub id: channel::Id, -} - -impl IntoResponse for Response { - fn into_response(self) -> response::Response { - (StatusCode::ACCEPTED, Json(self)).into_response() - } -} - -#[derive(Debug, thiserror::Error)] -#[error(transparent)] -pub struct Error(#[from] pub app::DeleteError); - -impl IntoResponse for Error { - fn into_response(self) -> response::Response { - let Self(error) = self; - match error { - app::DeleteError::NotFound(_) | app::DeleteError::Deleted(_) => { - NotFound(error).into_response() - } - app::DeleteError::NotEmpty(_) => { - (StatusCode::CONFLICT, error.to_string()).into_response() - } - app::DeleteError::Name(_) | app::DeleteError::Database(_) => { - Internal::from(error).into_response() - } - } - } -} diff --git a/src/channel/handlers/delete/test.rs b/src/channel/handlers/delete/test.rs deleted file mode 100644 index 99c19db..0000000 --- a/src/channel/handlers/delete/test.rs +++ /dev/null @@ -1,184 +0,0 @@ -use axum::extract::{Path, State}; -use itertools::Itertools; - -use crate::{channel::app, test::fixtures}; - -#[tokio::test] -pub async fn valid_channel() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; - - // Send the request - - let deleter = fixtures::identity::create(&app, &fixtures::now()).await; - let response = super::handler( - State(app.clone()), - Path(channel.id.clone()), - fixtures::now(), - deleter, - ) - .await - .expect("deleting a valid channel succeeds"); - - // Verify the response - - assert_eq!(channel.id, response.id); - - // Verify the semantics - - let snapshot = app.boot().snapshot().await.expect("boot always succeeds"); - let created = snapshot - .events - .into_iter() - .filter_map(fixtures::event::channel) - .filter_map(fixtures::event::channel::created) - .exactly_one() - .expect("only one channel has been created"); - // We don't expect `channel` to match the event exactly, as the name will have been - // tombstoned and the channel given a `deleted_at` date. - assert_eq!(channel.id, created.channel.id); -} - -#[tokio::test] -pub async fn invalid_channel_id() { - // Set up the environment - - let app = fixtures::scratch_app().await; - - // Send the request - - let deleter = fixtures::identity::create(&app, &fixtures::now()).await; - let channel = fixtures::channel::fictitious(); - let super::Error(error) = super::handler( - State(app.clone()), - Path(channel.clone()), - fixtures::now(), - deleter, - ) - .await - .expect_err("deleting a nonexistent channel fails"); - - // Verify the response - - assert!(matches!(error, app::DeleteError::NotFound(id) if id == channel)); -} - -#[tokio::test] -pub async fn channel_deleted() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; - - app.channels() - .delete(&channel.id, &fixtures::now()) - .await - .expect("deleting a recently-sent channel succeeds"); - - // Send the request - - let deleter = fixtures::identity::create(&app, &fixtures::now()).await; - let super::Error(error) = super::handler( - State(app.clone()), - Path(channel.id.clone()), - fixtures::now(), - deleter, - ) - .await - .expect_err("deleting a deleted channel fails"); - - // Verify the response - - assert!(matches!(error, app::DeleteError::Deleted(id) if id == channel.id)); -} - -#[tokio::test] -pub async fn channel_expired() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; - - app.channels() - .expire(&fixtures::now()) - .await - .expect("expiring channels always succeeds"); - - // Send the request - - let deleter = fixtures::identity::create(&app, &fixtures::now()).await; - let super::Error(error) = super::handler( - State(app.clone()), - Path(channel.id.clone()), - fixtures::now(), - deleter, - ) - .await - .expect_err("deleting an expired channel fails"); - - // Verify the response - - assert!(matches!(error, app::DeleteError::Deleted(id) if id == channel.id)); -} - -#[tokio::test] -pub async fn channel_purged() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; - - app.channels() - .expire(&fixtures::old()) - .await - .expect("expiring channels always succeeds"); - - app.channels() - .purge(&fixtures::now()) - .await - .expect("purging channels always succeeds"); - - // Send the request - - let deleter = fixtures::identity::create(&app, &fixtures::now()).await; - let super::Error(error) = super::handler( - State(app.clone()), - Path(channel.id.clone()), - fixtures::now(), - deleter, - ) - .await - .expect_err("deleting a purged channel fails"); - - // Verify the response - - assert!(matches!(error, app::DeleteError::NotFound(id) if id == channel.id)); -} - -#[tokio::test] -pub async fn channel_not_empty() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; - let sender = fixtures::user::create(&app, &fixtures::now()).await; - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; - - // Send the request - - let deleter = fixtures::identity::create(&app, &fixtures::now()).await; - let super::Error(error) = super::handler( - State(app.clone()), - Path(channel.id.clone()), - fixtures::now(), - deleter, - ) - .await - .expect_err("deleting a channel with messages fails"); - - // Verify the response - - assert!(matches!(error, app::DeleteError::NotEmpty(id) if id == channel.id)); -} diff --git a/src/channel/handlers/mod.rs b/src/channel/handlers/mod.rs deleted file mode 100644 index f2ffd0d..0000000 --- a/src/channel/handlers/mod.rs +++ /dev/null @@ -1,9 +0,0 @@ -mod create; -mod delete; -mod send; - -pub use create::handler as create; -pub use delete::handler as delete; -pub use send::handler as send; - -type PathInfo = crate::channel::Id; diff --git a/src/channel/handlers/send/mod.rs b/src/channel/handlers/send/mod.rs deleted file mode 100644 index bde39e5..0000000 --- a/src/channel/handlers/send/mod.rs +++ /dev/null @@ -1,63 +0,0 @@ -use axum::{ - extract::{Json, Path, State}, - http::StatusCode, - response::{self, IntoResponse}, -}; - -use crate::channel::handlers::PathInfo; -use crate::{ - app::App, - clock::RequestedAt, - error::{Internal, NotFound}, - message::{Body, Message, app::SendError}, - token::extract::Identity, -}; - -#[cfg(test)] -mod test; - -pub async fn handler( - State(app): State<App>, - Path(channel): Path<PathInfo>, - RequestedAt(sent_at): RequestedAt, - identity: Identity, - Json(request): Json<Request>, -) -> Result<Response, Error> { - let message = app - .messages() - .send(&channel, &identity.user, &sent_at, &request.body) - .await?; - - Ok(Response(message)) -} - -#[derive(serde::Deserialize)] -pub struct Request { - pub body: Body, -} - -#[derive(Debug)] -pub struct Response(pub Message); - -impl IntoResponse for Response { - fn into_response(self) -> response::Response { - let Self(message) = self; - (StatusCode::ACCEPTED, Json(message)).into_response() - } -} - -#[derive(Debug, thiserror::Error)] -#[error(transparent)] -pub struct Error(#[from] pub SendError); - -impl IntoResponse for Error { - fn into_response(self) -> response::Response { - let Self(error) = self; - match error { - SendError::ChannelNotFound(_) | SendError::ChannelDeleted(_) => { - NotFound(error).into_response() - } - SendError::Name(_) | SendError::Database(_) => Internal::from(error).into_response(), - } - } -} diff --git a/src/channel/handlers/send/test.rs b/src/channel/handlers/send/test.rs deleted file mode 100644 index 70d45eb..0000000 --- a/src/channel/handlers/send/test.rs +++ /dev/null @@ -1,130 +0,0 @@ -use axum::extract::{Json, Path, State}; -use futures::stream::{self, StreamExt as _}; - -use crate::{ - channel, - event::Sequenced, - message::app::SendError, - test::fixtures::{self, future::Expect as _}, -}; - -#[tokio::test] -async fn messages_in_order() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let sender = fixtures::identity::create(&app, &fixtures::now()).await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; - let resume_point = fixtures::boot::resume_point(&app).await; - - // Call the endpoint (twice) - - let requests = vec![ - (fixtures::now(), fixtures::message::propose()), - (fixtures::now(), fixtures::message::propose()), - ]; - - for (sent_at, body) in &requests { - let request = super::Request { body: body.clone() }; - - let _ = super::handler( - State(app.clone()), - Path(channel.id.clone()), - sent_at.clone(), - sender.clone(), - Json(request), - ) - .await - .expect("sending to a valid channel succeeds"); - } - - // Verify the semantics - - let mut events = app - .events() - .subscribe(resume_point) - .await - .expect("subscribing to a valid channel succeeds") - .filter_map(fixtures::event::stream::message) - .filter_map(fixtures::event::stream::message::sent) - .zip(stream::iter(requests)); - - while let Some((event, (sent_at, body))) = events - .next() - .expect_ready("an event should be ready for each message") - .await - { - assert_eq!(*sent_at, event.at()); - assert_eq!(sender.user.id, event.message.sender); - assert_eq!(body, event.message.body); - } -} - -#[tokio::test] -async fn nonexistent_channel() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let sender = fixtures::identity::create(&app, &fixtures::now()).await; - - // Call the endpoint - - let sent_at = fixtures::now(); - let channel = channel::Id::generate(); - let request = super::Request { - body: fixtures::message::propose(), - }; - let super::Error(error) = super::handler( - State(app), - Path(channel.clone()), - sent_at, - sender, - Json(request), - ) - .await - .expect_err("sending to a nonexistent channel fails"); - - // Verify the structure of the response - - assert!(matches!( - error, - SendError::ChannelNotFound(error_channel) if channel == error_channel - )); -} - -#[tokio::test] -async fn deleted_channel() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let sender = fixtures::identity::create(&app, &fixtures::now()).await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; - - app.channels() - .delete(&channel.id, &fixtures::now()) - .await - .expect("deleting a new channel succeeds"); - - // Call the endpoint - - let sent_at = fixtures::now(); - let request = super::Request { - body: fixtures::message::propose(), - }; - let super::Error(error) = super::handler( - State(app), - Path(channel.id.clone()), - sent_at, - sender, - Json(request), - ) - .await - .expect_err("sending to a deleted channel fails"); - - // Verify the structure of the response - - assert!(matches!( - error, - SendError::ChannelDeleted(error_channel) if channel.id == error_channel - )); -} diff --git a/src/channel/history.rs b/src/channel/history.rs deleted file mode 100644 index 85da5a5..0000000 --- a/src/channel/history.rs +++ /dev/null @@ -1,69 +0,0 @@ -use itertools::Itertools as _; - -use super::{ - Channel, Id, - event::{Created, Deleted, Event}, -}; -use crate::event::{Instant, Sequence}; - -#[derive(Clone, Debug, Eq, PartialEq)] -pub struct History { - pub channel: Channel, - pub deleted: Option<Instant>, -} - -// State interface -impl History { - pub fn id(&self) -> &Id { - &self.channel.id - } - - // Snapshot of this channel as it was when created. (Note to the future: it's - // okay if this returns a redacted or modified version of the channel. If we - // implement renames by redacting the original name, then this should return the - // renamed channel, not the original, even if that's not how it was "as - // created.") - pub fn as_created(&self) -> Channel { - self.channel.clone() - } - - pub fn as_of<S>(&self, sequence: S) -> Option<Channel> - where - S: Into<Sequence>, - { - self.events() - .filter(Sequence::up_to(sequence.into())) - .collect() - } - - // Snapshot of this channel as of all events recorded in this history. - pub fn as_snapshot(&self) -> Option<Channel> { - self.events().collect() - } -} - -// Event factories -impl History { - pub fn events(&self) -> impl Iterator<Item = Event> + use<> { - [self.created()] - .into_iter() - .merge_by(self.deleted(), Sequence::merge) - } - - fn created(&self) -> Event { - Created { - channel: self.channel.clone(), - } - .into() - } - - fn deleted(&self) -> Option<Event> { - self.deleted.map(|instant| { - Deleted { - instant, - id: self.channel.id.clone(), - } - .into() - }) - } -} diff --git a/src/channel/id.rs b/src/channel/id.rs deleted file mode 100644 index 22a2700..0000000 --- a/src/channel/id.rs +++ /dev/null @@ -1,38 +0,0 @@ -use std::fmt; - -use crate::id::Id as BaseId; - -// Stable identifier for a [Channel]. Prefixed with `C`. -#[derive( - Clone, - Debug, - Eq, - Hash, - Ord, - PartialEq, - PartialOrd, - sqlx::Type, - serde::Deserialize, - serde::Serialize, -)] -#[sqlx(transparent)] -#[serde(transparent)] -pub struct Id(BaseId); - -impl From<BaseId> for Id { - fn from(id: BaseId) -> Self { - Self(id) - } -} - -impl Id { - pub fn generate() -> Self { - BaseId::generate("C") - } -} - -impl fmt::Display for Id { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.0.fmt(f) - } -} diff --git a/src/channel/mod.rs b/src/channel/mod.rs deleted file mode 100644 index bbaf33e..0000000 --- a/src/channel/mod.rs +++ /dev/null @@ -1,10 +0,0 @@ -pub mod app; -pub mod event; -pub mod handlers; -mod history; -mod id; -pub mod repo; -mod snapshot; -mod validate; - -pub use self::{event::Event, history::History, id::Id, snapshot::Channel}; diff --git a/src/channel/repo.rs b/src/channel/repo.rs deleted file mode 100644 index fd2173a..0000000 --- a/src/channel/repo.rs +++ /dev/null @@ -1,336 +0,0 @@ -use futures::stream::{StreamExt as _, TryStreamExt as _}; -use sqlx::{SqliteConnection, Transaction, sqlite::Sqlite}; - -use crate::{ - channel::{Channel, History, Id}, - clock::DateTime, - db::NotFound, - event::{Instant, Sequence}, - name::{self, Name}, -}; - -pub trait Provider { - fn channels(&mut self) -> Channels; -} - -impl Provider for Transaction<'_, Sqlite> { - fn channels(&mut self) -> Channels { - Channels(self) - } -} - -pub struct Channels<'t>(&'t mut SqliteConnection); - -impl Channels<'_> { - pub async fn create(&mut self, name: &Name, created: &Instant) -> Result<History, sqlx::Error> { - let id = Id::generate(); - let name = name.clone(); - let display_name = name.display(); - let canonical_name = name.canonical(); - let created = *created; - - sqlx::query!( - r#" - insert into conversation (id, created_at, created_sequence, last_sequence) - values ($1, $2, $3, $4) - "#, - id, - created.at, - created.sequence, - created.sequence, - ) - .execute(&mut *self.0) - .await?; - - sqlx::query!( - r#" - insert into conversation_name (id, display_name, canonical_name) - values ($1, $2, $3) - "#, - id, - display_name, - canonical_name, - ) - .execute(&mut *self.0) - .await?; - - let channel = History { - channel: Channel { - created, - id, - name: name.clone(), - deleted_at: None, - }, - deleted: None, - }; - - Ok(channel) - } - - pub async fn by_id(&mut self, channel: &Id) -> Result<History, LoadError> { - let channel = sqlx::query!( - r#" - select - id as "id: Id", - name.display_name as "display_name?: String", - name.canonical_name as "canonical_name?: String", - conversation.created_at as "created_at: DateTime", - conversation.created_sequence as "created_sequence: Sequence", - deleted.deleted_at as "deleted_at?: DateTime", - deleted.deleted_sequence as "deleted_sequence?: Sequence" - from conversation - left join conversation_name as name - using (id) - left join conversation_deleted as deleted - using (id) - where id = $1 - "#, - channel, - ) - .map(|row| { - Ok::<_, name::Error>(History { - channel: Channel { - created: Instant::new(row.created_at, row.created_sequence), - id: row.id, - name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(), - deleted_at: row.deleted_at, - }, - deleted: Instant::optional(row.deleted_at, row.deleted_sequence), - }) - }) - .fetch_one(&mut *self.0) - .await??; - - Ok(channel) - } - - pub async fn all(&mut self, resume_at: Sequence) -> Result<Vec<History>, LoadError> { - let channels = sqlx::query!( - r#" - select - id as "id: Id", - name.display_name as "display_name?: String", - name.canonical_name as "canonical_name?: String", - conversation.created_at as "created_at: DateTime", - conversation.created_sequence as "created_sequence: Sequence", - deleted.deleted_at as "deleted_at?: DateTime", - deleted.deleted_sequence as "deleted_sequence?: Sequence" - from conversation - left join conversation_name as name - using (id) - left join conversation_deleted as deleted - using (id) - where conversation.created_sequence <= $1 - order by name.canonical_name - "#, - resume_at, - ) - .map(|row| { - Ok::<_, name::Error>(History { - channel: Channel { - created: Instant::new(row.created_at, row.created_sequence), - id: row.id, - name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(), - deleted_at: row.deleted_at, - }, - deleted: Instant::optional(row.deleted_at, row.deleted_sequence), - }) - }) - .fetch(&mut *self.0) - .map(|res| Ok::<_, LoadError>(res??)) - .try_collect() - .await?; - - Ok(channels) - } - - pub async fn replay(&mut self, resume_at: Sequence) -> Result<Vec<History>, LoadError> { - let channels = sqlx::query!( - r#" - select - id as "id: Id", - name.display_name as "display_name?: String", - name.canonical_name as "canonical_name?: String", - conversation.created_at as "created_at: DateTime", - conversation.created_sequence as "created_sequence: Sequence", - deleted.deleted_at as "deleted_at?: DateTime", - deleted.deleted_sequence as "deleted_sequence?: Sequence" - from conversation - left join conversation_name as name - using (id) - left join conversation_deleted as deleted - using (id) - where conversation.last_sequence > $1 - "#, - resume_at, - ) - .map(|row| { - Ok::<_, name::Error>(History { - channel: Channel { - created: Instant::new(row.created_at, row.created_sequence), - id: row.id, - name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(), - deleted_at: row.deleted_at, - }, - deleted: Instant::optional(row.deleted_at, row.deleted_sequence), - }) - }) - .fetch(&mut *self.0) - .map(|res| Ok::<_, LoadError>(res??)) - .try_collect() - .await?; - - Ok(channels) - } - - pub async fn delete( - &mut self, - channel: &History, - deleted: &Instant, - ) -> Result<History, LoadError> { - let id = channel.id(); - sqlx::query!( - r#" - update conversation - set last_sequence = max(last_sequence, $1) - where id = $2 - returning id as "id: Id" - "#, - deleted.sequence, - id, - ) - .fetch_one(&mut *self.0) - .await?; - - sqlx::query!( - r#" - insert into conversation_deleted (id, deleted_at, deleted_sequence) - values ($1, $2, $3) - "#, - id, - deleted.at, - deleted.sequence, - ) - .execute(&mut *self.0) - .await?; - - // Small social responsibility hack here: when a conversation is deleted, its - // name is retconned to have been the empty string. Someone reading the event - // stream afterwards, or looking at channels via the API, cannot retrieve the - // "deleted" channel's information by ignoring the deletion event. - // - // This also avoids the need for a separate name reservation table to ensure - // that live channels have unique names, since the `channel` table's name field - // is unique over non-null values. - sqlx::query!( - r#" - delete from conversation_name - where id = $1 - "#, - id, - ) - .execute(&mut *self.0) - .await?; - - let channel = self.by_id(id).await?; - - Ok(channel) - } - - pub async fn purge(&mut self, purge_at: &DateTime) -> Result<(), sqlx::Error> { - let channels = sqlx::query_scalar!( - r#" - with has_messages as ( - select conversation - from message - group by conversation - ) - delete from conversation_deleted - where deleted_at < $1 - and id not in has_messages - returning id as "id: Id" - "#, - purge_at, - ) - .fetch_all(&mut *self.0) - .await?; - - for channel in channels { - // Wanted: a way to batch these up into one query. - sqlx::query!( - r#" - delete from conversation - where id = $1 - "#, - channel, - ) - .execute(&mut *self.0) - .await?; - } - - Ok(()) - } - - pub async fn expired(&mut self, expired_at: &DateTime) -> Result<Vec<History>, LoadError> { - let channels = sqlx::query!( - r#" - select - conversation.id as "id: Id", - name.display_name as "display_name?: String", - name.canonical_name as "canonical_name?: String", - conversation.created_at as "created_at: DateTime", - conversation.created_sequence as "created_sequence: Sequence", - deleted.deleted_at as "deleted_at?: DateTime", - deleted.deleted_sequence as "deleted_sequence?: Sequence" - from conversation - left join conversation_name as name - using (id) - left join conversation_deleted as deleted - using (id) - left join message - on conversation.id = message.conversation - where conversation.created_at < $1 - and message.id is null - and deleted.id is null - "#, - expired_at, - ) - .map(|row| { - Ok::<_, name::Error>(History { - channel: Channel { - created: Instant::new(row.created_at, row.created_sequence), - id: row.id, - name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(), - deleted_at: row.deleted_at, - }, - deleted: Instant::optional(row.deleted_at, row.deleted_sequence), - }) - }) - .fetch(&mut *self.0) - .map(|res| Ok::<_, LoadError>(res??)) - .try_collect() - .await?; - - Ok(channels) - } -} - -#[derive(Debug, thiserror::Error)] -#[error(transparent)] -pub enum LoadError { - Database(#[from] sqlx::Error), - Name(#[from] name::Error), -} - -impl<T> NotFound for Result<T, LoadError> { - type Ok = T; - type Error = LoadError; - - fn optional(self) -> Result<Option<T>, LoadError> { - match self { - Ok(value) => Ok(Some(value)), - Err(LoadError::Database(sqlx::Error::RowNotFound)) => Ok(None), - Err(other) => Err(other), - } - } -} diff --git a/src/channel/snapshot.rs b/src/channel/snapshot.rs deleted file mode 100644 index 96801b8..0000000 --- a/src/channel/snapshot.rs +++ /dev/null @@ -1,43 +0,0 @@ -use super::{ - Id, - event::{Created, Event}, -}; -use crate::{clock::DateTime, event::Instant, name::Name}; - -#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] -pub struct Channel { - #[serde(flatten)] - pub created: Instant, - pub id: Id, - pub name: Name, - #[serde(skip_serializing_if = "Option::is_none")] - pub deleted_at: Option<DateTime>, -} - -impl Channel { - fn apply(state: Option<Self>, event: Event) -> Option<Self> { - match (state, event) { - (None, Event::Created(event)) => Some(event.into()), - (Some(channel), Event::Deleted(event)) if channel.id == event.id => None, - (state, event) => panic!("invalid channel event {event:#?} for state {state:#?}"), - } - } -} - -impl FromIterator<Event> for Option<Channel> { - fn from_iter<I: IntoIterator<Item = Event>>(events: I) -> Self { - events.into_iter().fold(None, Channel::apply) - } -} - -impl From<&Created> for Channel { - fn from(event: &Created) -> Self { - event.channel.clone() - } -} - -impl From<Created> for Channel { - fn from(event: Created) -> Self { - event.channel - } -} diff --git a/src/channel/validate.rs b/src/channel/validate.rs deleted file mode 100644 index 7894e0c..0000000 --- a/src/channel/validate.rs +++ /dev/null @@ -1,25 +0,0 @@ -use std::ops::Not as _; - -use unicode_segmentation::UnicodeSegmentation as _; - -use crate::name::Name; - -// Picked out of a hat. The power of two is not meaningful. -const NAME_TOO_LONG: usize = 64; - -pub fn name(name: &Name) -> bool { - let display = name.display(); - - [ - display.graphemes(true).count() < NAME_TOO_LONG, - display.chars().any(char::is_control).not(), - display.chars().next().is_some_and(|c| !c.is_whitespace()), - display.chars().last().is_some_and(|c| !c.is_whitespace()), - display - .chars() - .zip(display.chars().skip(1)) - .all(|(a, b)| !(a.is_whitespace() && b.is_whitespace())), - ] - .into_iter() - .all(|value| value) -} |
