diff options
Diffstat (limited to 'src/conversation')
| -rw-r--r-- | src/conversation/app.rs | 236 | ||||
| -rw-r--r-- | src/conversation/event.rs | 46 | ||||
| -rw-r--r-- | src/conversation/handlers/create/mod.rs | 67 | ||||
| -rw-r--r-- | src/conversation/handlers/create/test.rs | 250 | ||||
| -rw-r--r-- | src/conversation/handlers/delete/mod.rs | 61 | ||||
| -rw-r--r-- | src/conversation/handlers/delete/test.rs | 184 | ||||
| -rw-r--r-- | src/conversation/handlers/mod.rs | 9 | ||||
| -rw-r--r-- | src/conversation/handlers/send/mod.rs | 63 | ||||
| -rw-r--r-- | src/conversation/handlers/send/test.rs | 130 | ||||
| -rw-r--r-- | src/conversation/history.rs | 69 | ||||
| -rw-r--r-- | src/conversation/id.rs | 38 | ||||
| -rw-r--r-- | src/conversation/mod.rs | 10 | ||||
| -rw-r--r-- | src/conversation/repo.rs | 332 | ||||
| -rw-r--r-- | src/conversation/snapshot.rs | 43 | ||||
| -rw-r--r-- | src/conversation/validate.rs | 25 |
15 files changed, 1563 insertions, 0 deletions
diff --git a/src/conversation/app.rs b/src/conversation/app.rs new file mode 100644 index 0000000..81ccdcf --- /dev/null +++ b/src/conversation/app.rs @@ -0,0 +1,236 @@ +use chrono::TimeDelta; +use itertools::Itertools; +use sqlx::sqlite::SqlitePool; + +use super::{ + Conversation, Id, + repo::{LoadError, Provider as _}, + validate, +}; +use crate::{ + clock::DateTime, + db::{Duplicate as _, NotFound as _}, + event::{Broadcaster, Event, Sequence, repo::Provider as _}, + message::{self, repo::Provider as _}, + name::{self, Name}, +}; + +pub struct Conversations<'a> { + db: &'a SqlitePool, + events: &'a Broadcaster, +} + +impl<'a> Conversations<'a> { + pub const fn new(db: &'a SqlitePool, events: &'a Broadcaster) -> Self { + Self { db, events } + } + + pub async fn create( + &self, + name: &Name, + created_at: &DateTime, + ) -> Result<Conversation, CreateError> { + if !validate::name(name) { + return Err(CreateError::InvalidName(name.clone())); + } + + let mut tx = self.db.begin().await?; + let created = tx.sequence().next(created_at).await?; + let conversation = tx + .conversations() + .create(name, &created) + .await + .duplicate(|| CreateError::DuplicateName(name.clone()))?; + tx.commit().await?; + + self.events + .broadcast(conversation.events().map(Event::from).collect::<Vec<_>>()); + + Ok(conversation.as_created()) + } + + // This function is careless with respect to time, and gets you the + // conversation as it exists in the specific moment when you call it. + pub async fn get(&self, conversation: &Id) -> Result<Conversation, Error> { + let to_not_found = || Error::NotFound(conversation.clone()); + let to_deleted = || Error::Deleted(conversation.clone()); + + let mut tx = self.db.begin().await?; + let conversation = tx + .conversations() + .by_id(conversation) + .await + .not_found(to_not_found)?; + tx.commit().await?; + + conversation.as_snapshot().ok_or_else(to_deleted) + } + + pub async fn delete( + &self, + conversation: &Id, + deleted_at: &DateTime, + ) -> Result<(), DeleteError> { + let mut tx = self.db.begin().await?; + + let conversation = tx + .conversations() + .by_id(conversation) + .await + .not_found(|| DeleteError::NotFound(conversation.clone()))?; + conversation + .as_snapshot() + .ok_or_else(|| DeleteError::Deleted(conversation.id().clone()))?; + + let mut events = Vec::new(); + + let messages = tx.messages().live(&conversation).await?; + let has_messages = messages + .iter() + .map(message::History::as_snapshot) + .any(|message| message.is_some()); + if has_messages { + return Err(DeleteError::NotEmpty(conversation.id().clone())); + } + + let deleted = tx.sequence().next(deleted_at).await?; + let conversation = tx.conversations().delete(&conversation, &deleted).await?; + events.extend( + conversation + .events() + .filter(Sequence::start_from(deleted.sequence)) + .map(Event::from), + ); + + tx.commit().await?; + + self.events.broadcast(events); + + Ok(()) + } + + pub async fn expire(&self, relative_to: &DateTime) -> Result<(), ExpireError> { + // Somewhat arbitrarily, expire after 7 days. Active conversation will not be + // expired until their messages expire. + let expire_at = relative_to.to_owned() - TimeDelta::days(7); + + let mut tx = self.db.begin().await?; + let expired = tx.conversations().expired(&expire_at).await?; + + let mut events = Vec::with_capacity(expired.len()); + for conversation in expired { + let deleted = tx.sequence().next(relative_to).await?; + let conversation = tx.conversations().delete(&conversation, &deleted).await?; + events.push( + conversation + .events() + .filter(Sequence::start_from(deleted.sequence)), + ); + } + + tx.commit().await?; + + self.events.broadcast( + events + .into_iter() + .kmerge_by(Sequence::merge) + .map(Event::from) + .collect::<Vec<_>>(), + ); + + Ok(()) + } + + pub async fn purge(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { + // Somewhat arbitrarily, purge after 6 hours. + let purge_at = relative_to.to_owned() - TimeDelta::hours(6); + + let mut tx = self.db.begin().await?; + tx.conversations().purge(&purge_at).await?; + tx.commit().await?; + + Ok(()) + } +} + +#[derive(Debug, thiserror::Error)] +pub enum CreateError { + #[error("conversation named {0} already exists")] + DuplicateName(Name), + #[error("invalid conversation name: {0}")] + InvalidName(Name), + #[error(transparent)] + Database(#[from] sqlx::Error), + #[error(transparent)] + Name(#[from] name::Error), +} + +impl From<LoadError> for CreateError { + fn from(error: LoadError) -> Self { + match error { + LoadError::Database(error) => error.into(), + LoadError::Name(error) => error.into(), + } + } +} + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("conversation {0} not found")] + NotFound(Id), + #[error("conversation {0} deleted")] + Deleted(Id), + #[error(transparent)] + Database(#[from] sqlx::Error), + #[error(transparent)] + Name(#[from] name::Error), +} + +impl From<LoadError> for Error { + fn from(error: LoadError) -> Self { + match error { + LoadError::Database(error) => error.into(), + LoadError::Name(error) => error.into(), + } + } +} + +#[derive(Debug, thiserror::Error)] +pub enum DeleteError { + #[error("conversation {0} not found")] + NotFound(Id), + #[error("conversation {0} deleted")] + Deleted(Id), + #[error("conversation {0} not empty")] + NotEmpty(Id), + #[error(transparent)] + Database(#[from] sqlx::Error), + #[error(transparent)] + Name(#[from] name::Error), +} + +impl From<LoadError> for DeleteError { + fn from(error: LoadError) -> Self { + match error { + LoadError::Database(error) => error.into(), + LoadError::Name(error) => error.into(), + } + } +} + +#[derive(Debug, thiserror::Error)] +pub enum ExpireError { + #[error(transparent)] + Database(#[from] sqlx::Error), + #[error(transparent)] + Name(#[from] name::Error), +} + +impl From<LoadError> for ExpireError { + fn from(error: LoadError) -> Self { + match error { + LoadError::Database(error) => error.into(), + LoadError::Name(error) => error.into(), + } + } +} diff --git a/src/conversation/event.rs b/src/conversation/event.rs new file mode 100644 index 0000000..f5e8a81 --- /dev/null +++ b/src/conversation/event.rs @@ -0,0 +1,46 @@ +use super::Conversation; +use crate::{ + conversation, + event::{Instant, Sequenced}, +}; + +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] +#[serde(tag = "event", rename_all = "snake_case")] +pub enum Event { + Created(Created), + Deleted(Deleted), +} + +impl Sequenced for Event { + fn instant(&self) -> Instant { + match self { + Self::Created(event) => event.conversation.created, + Self::Deleted(event) => event.instant, + } + } +} + +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] +pub struct Created { + #[serde(flatten)] + pub conversation: Conversation, +} + +impl From<Created> for Event { + fn from(event: Created) -> Self { + Self::Created(event) + } +} + +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] +pub struct Deleted { + #[serde(flatten)] + pub instant: Instant, + pub id: conversation::Id, +} + +impl From<Deleted> for Event { + fn from(event: Deleted) -> Self { + Self::Deleted(event) + } +} diff --git a/src/conversation/handlers/create/mod.rs b/src/conversation/handlers/create/mod.rs new file mode 100644 index 0000000..18eca1f --- /dev/null +++ b/src/conversation/handlers/create/mod.rs @@ -0,0 +1,67 @@ +use axum::{ + extract::{Json, State}, + http::StatusCode, + response::{self, IntoResponse}, +}; + +use crate::{ + app::App, + clock::RequestedAt, + conversation::{Conversation, app}, + error::Internal, + name::Name, + token::extract::Identity, +}; + +#[cfg(test)] +mod test; + +pub async fn handler( + State(app): State<App>, + _: Identity, // requires auth, but doesn't actually care who you are + RequestedAt(created_at): RequestedAt, + Json(request): Json<Request>, +) -> Result<Response, Error> { + let conversation = app + .conversations() + .create(&request.name, &created_at) + .await + .map_err(Error)?; + + Ok(Response(conversation)) +} + +#[derive(serde::Deserialize)] +pub struct Request { + pub name: Name, +} + +#[derive(Debug)] +pub struct Response(pub Conversation); + +impl IntoResponse for Response { + fn into_response(self) -> response::Response { + let Self(conversation) = self; + (StatusCode::ACCEPTED, Json(conversation)).into_response() + } +} + +#[derive(Debug)] +pub struct Error(pub app::CreateError); + +impl IntoResponse for Error { + fn into_response(self) -> response::Response { + let Self(error) = self; + match error { + app::CreateError::DuplicateName(_) => { + (StatusCode::CONFLICT, error.to_string()).into_response() + } + app::CreateError::InvalidName(_) => { + (StatusCode::BAD_REQUEST, error.to_string()).into_response() + } + app::CreateError::Name(_) | app::CreateError::Database(_) => { + Internal::from(error).into_response() + } + } + } +} diff --git a/src/conversation/handlers/create/test.rs b/src/conversation/handlers/create/test.rs new file mode 100644 index 0000000..bc05b00 --- /dev/null +++ b/src/conversation/handlers/create/test.rs @@ -0,0 +1,250 @@ +use std::future; + +use axum::extract::{Json, State}; +use futures::stream::StreamExt as _; +use itertools::Itertools; + +use crate::{ + conversation::app, + name::Name, + test::fixtures::{self, future::Expect as _}, +}; + +#[tokio::test] +async fn new_conversation() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let creator = fixtures::identity::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; + + // Call the endpoint + + let name = fixtures::conversation::propose(); + let request = super::Request { name: name.clone() }; + let super::Response(response) = + super::handler(State(app.clone()), creator, fixtures::now(), Json(request)) + .await + .expect("creating a conversation in an empty app succeeds"); + + // Verify the structure of the response + + assert_eq!(name, response.name); + + // Verify the semantics + + let snapshot = app.boot().snapshot().await.expect("boot always succeeds"); + let created = snapshot + .events + .into_iter() + .filter_map(fixtures::event::conversation) + .filter_map(fixtures::event::conversation::created) + .exactly_one() + .expect("only one conversation has been created"); + assert_eq!(response, created.conversation); + + let conversation = app + .conversations() + .get(&response.id) + .await + .expect("the newly-created conversation exists"); + assert_eq!(response, conversation); + + let mut events = app + .events() + .subscribe(resume_point) + .await + .expect("subscribing never fails") + .filter_map(fixtures::event::stream::conversation) + .filter_map(fixtures::event::stream::conversation::created) + .filter(|event| future::ready(event.conversation == response)); + + let event = events.next().expect_some("creation event published").await; + + assert_eq!(event.conversation, response); +} + +#[tokio::test] +async fn duplicate_name() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let creator = fixtures::identity::create(&app, &fixtures::now()).await; + let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; + + // Call the endpoint + + let request = super::Request { + name: conversation.name.clone(), + }; + let super::Error(error) = + super::handler(State(app.clone()), creator, fixtures::now(), Json(request)) + .await + .expect_err("duplicate conversation name should fail the request"); + + // Verify the structure of the response + + assert!(matches!( + error, + app::CreateError::DuplicateName(name) if conversation.name == name + )); +} + +#[tokio::test] +async fn conflicting_canonical_name() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let creator = fixtures::identity::create(&app, &fixtures::now()).await; + + let existing_name = Name::from("rijksmuseum"); + app.conversations() + .create(&existing_name, &fixtures::now()) + .await + .expect("creating a conversation in an empty environment succeeds"); + + let conflicting_name = Name::from("r\u{0133}ksmuseum"); + + // Call the endpoint + + let request = super::Request { + name: conflicting_name.clone(), + }; + let super::Error(error) = + super::handler(State(app.clone()), creator, fixtures::now(), Json(request)) + .await + .expect_err("duplicate conversation name should fail the request"); + + // Verify the structure of the response + + assert!(matches!( + error, + app::CreateError::DuplicateName(name) if conflicting_name == name + )); +} + +#[tokio::test] +async fn invalid_name() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let creator = fixtures::identity::create(&app, &fixtures::now()).await; + + // Call the endpoint + + let name = fixtures::conversation::propose_invalid_name(); + let request = super::Request { name: name.clone() }; + let super::Error(error) = crate::conversation::handlers::create::handler( + State(app.clone()), + creator, + fixtures::now(), + Json(request), + ) + .await + .expect_err("invalid conversation name should fail the request"); + + // Verify the structure of the response + + assert!(matches!( + error, + app::CreateError::InvalidName(error_name) if name == error_name + )); +} + +#[tokio::test] +async fn name_reusable_after_delete() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let creator = fixtures::identity::create(&app, &fixtures::now()).await; + let name = fixtures::conversation::propose(); + + // Call the endpoint (first time) + + let request = super::Request { name: name.clone() }; + let super::Response(response) = super::handler( + State(app.clone()), + creator.clone(), + fixtures::now(), + Json(request), + ) + .await + .expect("new conversation in an empty app"); + + // Delete the conversation + + app.conversations() + .delete(&response.id, &fixtures::now()) + .await + .expect("deleting a newly-created conversation succeeds"); + + // Call the endpoint (second time) + + let request = super::Request { name: name.clone() }; + let super::Response(response) = + super::handler(State(app.clone()), creator, fixtures::now(), Json(request)) + .await + .expect("creation succeeds after original conversation deleted"); + + // Verify the structure of the response + + assert_eq!(name, response.name); + + // Verify the semantics + + let conversation = app + .conversations() + .get(&response.id) + .await + .expect("the newly-created conversation exists"); + assert_eq!(response, conversation); +} + +#[tokio::test] +async fn name_reusable_after_expiry() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let creator = fixtures::identity::create(&app, &fixtures::ancient()).await; + let name = fixtures::conversation::propose(); + + // Call the endpoint (first time) + + let request = super::Request { name: name.clone() }; + let super::Response(_) = super::handler( + State(app.clone()), + creator.clone(), + fixtures::ancient(), + Json(request), + ) + .await + .expect("new conversation in an empty app"); + + // Expire the conversation + + app.conversations() + .expire(&fixtures::now()) + .await + .expect("expiry always succeeds"); + + // Call the endpoint (second time) + + let request = super::Request { name: name.clone() }; + let super::Response(response) = + super::handler(State(app.clone()), creator, fixtures::now(), Json(request)) + .await + .expect("creation succeeds after original conversation expired"); + + // Verify the structure of the response + + assert_eq!(name, response.name); + + // Verify the semantics + + let conversation = app + .conversations() + .get(&response.id) + .await + .expect("the newly-created conversation exists"); + assert_eq!(response, conversation); +} diff --git a/src/conversation/handlers/delete/mod.rs b/src/conversation/handlers/delete/mod.rs new file mode 100644 index 0000000..272165a --- /dev/null +++ b/src/conversation/handlers/delete/mod.rs @@ -0,0 +1,61 @@ +use axum::{ + extract::{Json, Path, State}, + http::StatusCode, + response::{self, IntoResponse}, +}; + +use crate::{ + app::App, + clock::RequestedAt, + conversation::{self, app, handlers::PathInfo}, + error::{Internal, NotFound}, + token::extract::Identity, +}; + +#[cfg(test)] +mod test; + +pub async fn handler( + State(app): State<App>, + Path(conversation): Path<PathInfo>, + RequestedAt(deleted_at): RequestedAt, + _: Identity, +) -> Result<Response, Error> { + app.conversations() + .delete(&conversation, &deleted_at) + .await?; + + Ok(Response { id: conversation }) +} + +#[derive(Debug, serde::Serialize)] +pub struct Response { + pub id: conversation::Id, +} + +impl IntoResponse for Response { + fn into_response(self) -> response::Response { + (StatusCode::ACCEPTED, Json(self)).into_response() + } +} + +#[derive(Debug, thiserror::Error)] +#[error(transparent)] +pub struct Error(#[from] pub app::DeleteError); + +impl IntoResponse for Error { + fn into_response(self) -> response::Response { + let Self(error) = self; + match error { + app::DeleteError::NotFound(_) | app::DeleteError::Deleted(_) => { + NotFound(error).into_response() + } + app::DeleteError::NotEmpty(_) => { + (StatusCode::CONFLICT, error.to_string()).into_response() + } + app::DeleteError::Name(_) | app::DeleteError::Database(_) => { + Internal::from(error).into_response() + } + } + } +} diff --git a/src/conversation/handlers/delete/test.rs b/src/conversation/handlers/delete/test.rs new file mode 100644 index 0000000..2718d3b --- /dev/null +++ b/src/conversation/handlers/delete/test.rs @@ -0,0 +1,184 @@ +use axum::extract::{Path, State}; +use itertools::Itertools; + +use crate::{conversation::app, test::fixtures}; + +#[tokio::test] +pub async fn valid_conversation() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; + + // Send the request + + let deleter = fixtures::identity::create(&app, &fixtures::now()).await; + let response = super::handler( + State(app.clone()), + Path(conversation.id.clone()), + fixtures::now(), + deleter, + ) + .await + .expect("deleting a valid conversation succeeds"); + + // Verify the response + + assert_eq!(conversation.id, response.id); + + // Verify the semantics + + let snapshot = app.boot().snapshot().await.expect("boot always succeeds"); + let created = snapshot + .events + .into_iter() + .filter_map(fixtures::event::conversation) + .filter_map(fixtures::event::conversation::created) + .exactly_one() + .expect("only one conversation has been created"); + // We don't expect `conversation` to match the event exactly, as the name will have + // been tombstoned and the conversation given a `deleted_at` date. + assert_eq!(conversation.id, created.conversation.id); +} + +#[tokio::test] +pub async fn invalid_conversation_id() { + // Set up the environment + + let app = fixtures::scratch_app().await; + + // Send the request + + let deleter = fixtures::identity::create(&app, &fixtures::now()).await; + let conversation = fixtures::conversation::fictitious(); + let super::Error(error) = super::handler( + State(app.clone()), + Path(conversation.clone()), + fixtures::now(), + deleter, + ) + .await + .expect_err("deleting a nonexistent conversation fails"); + + // Verify the response + + assert!(matches!(error, app::DeleteError::NotFound(id) if id == conversation)); +} + +#[tokio::test] +pub async fn conversation_deleted() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; + + app.conversations() + .delete(&conversation.id, &fixtures::now()) + .await + .expect("deleting a recently-created conversation succeeds"); + + // Send the request + + let deleter = fixtures::identity::create(&app, &fixtures::now()).await; + let super::Error(error) = super::handler( + State(app.clone()), + Path(conversation.id.clone()), + fixtures::now(), + deleter, + ) + .await + .expect_err("deleting a deleted conversation fails"); + + // Verify the response + + assert!(matches!(error, app::DeleteError::Deleted(id) if id == conversation.id)); +} + +#[tokio::test] +pub async fn conversation_expired() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let conversation = fixtures::conversation::create(&app, &fixtures::ancient()).await; + + app.conversations() + .expire(&fixtures::now()) + .await + .expect("expiring conversations always succeeds"); + + // Send the request + + let deleter = fixtures::identity::create(&app, &fixtures::now()).await; + let super::Error(error) = super::handler( + State(app.clone()), + Path(conversation.id.clone()), + fixtures::now(), + deleter, + ) + .await + .expect_err("deleting an expired conversation fails"); + + // Verify the response + + assert!(matches!(error, app::DeleteError::Deleted(id) if id == conversation.id)); +} + +#[tokio::test] +pub async fn conversation_purged() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let conversation = fixtures::conversation::create(&app, &fixtures::ancient()).await; + + app.conversations() + .expire(&fixtures::old()) + .await + .expect("expiring conversations always succeeds"); + + app.conversations() + .purge(&fixtures::now()) + .await + .expect("purging conversations always succeeds"); + + // Send the request + + let deleter = fixtures::identity::create(&app, &fixtures::now()).await; + let super::Error(error) = super::handler( + State(app.clone()), + Path(conversation.id.clone()), + fixtures::now(), + deleter, + ) + .await + .expect_err("deleting a purged conversation fails"); + + // Verify the response + + assert!(matches!(error, app::DeleteError::NotFound(id) if id == conversation.id)); +} + +#[tokio::test] +pub async fn conversation_not_empty() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; + let sender = fixtures::user::create(&app, &fixtures::now()).await; + fixtures::message::send(&app, &conversation, &sender, &fixtures::now()).await; + + // Send the request + + let deleter = fixtures::identity::create(&app, &fixtures::now()).await; + let super::Error(error) = super::handler( + State(app.clone()), + Path(conversation.id.clone()), + fixtures::now(), + deleter, + ) + .await + .expect_err("deleting a conversation with messages fails"); + + // Verify the response + + assert!(matches!(error, app::DeleteError::NotEmpty(id) if id == conversation.id)); +} diff --git a/src/conversation/handlers/mod.rs b/src/conversation/handlers/mod.rs new file mode 100644 index 0000000..2fe727c --- /dev/null +++ b/src/conversation/handlers/mod.rs @@ -0,0 +1,9 @@ +mod create; +mod delete; +mod send; + +pub use create::handler as create; +pub use delete::handler as delete; +pub use send::handler as send; + +type PathInfo = crate::conversation::Id; diff --git a/src/conversation/handlers/send/mod.rs b/src/conversation/handlers/send/mod.rs new file mode 100644 index 0000000..9ec020a --- /dev/null +++ b/src/conversation/handlers/send/mod.rs @@ -0,0 +1,63 @@ +use axum::{ + extract::{Json, Path, State}, + http::StatusCode, + response::{self, IntoResponse}, +}; + +use crate::conversation::handlers::PathInfo; +use crate::{ + app::App, + clock::RequestedAt, + error::{Internal, NotFound}, + message::{Body, Message, app::SendError}, + token::extract::Identity, +}; + +#[cfg(test)] +mod test; + +pub async fn handler( + State(app): State<App>, + Path(conversation): Path<PathInfo>, + RequestedAt(sent_at): RequestedAt, + identity: Identity, + Json(request): Json<Request>, +) -> Result<Response, Error> { + let message = app + .messages() + .send(&conversation, &identity.user, &sent_at, &request.body) + .await?; + + Ok(Response(message)) +} + +#[derive(serde::Deserialize)] +pub struct Request { + pub body: Body, +} + +#[derive(Debug)] +pub struct Response(pub Message); + +impl IntoResponse for Response { + fn into_response(self) -> response::Response { + let Self(message) = self; + (StatusCode::ACCEPTED, Json(message)).into_response() + } +} + +#[derive(Debug, thiserror::Error)] +#[error(transparent)] +pub struct Error(#[from] pub SendError); + +impl IntoResponse for Error { + fn into_response(self) -> response::Response { + let Self(error) = self; + match error { + SendError::ConversationNotFound(_) | SendError::ConversationDeleted(_) => { + NotFound(error).into_response() + } + SendError::Name(_) | SendError::Database(_) => Internal::from(error).into_response(), + } + } +} diff --git a/src/conversation/handlers/send/test.rs b/src/conversation/handlers/send/test.rs new file mode 100644 index 0000000..bd32510 --- /dev/null +++ b/src/conversation/handlers/send/test.rs @@ -0,0 +1,130 @@ +use axum::extract::{Json, Path, State}; +use futures::stream::{self, StreamExt as _}; + +use crate::{ + conversation, + event::Sequenced, + message::app::SendError, + test::fixtures::{self, future::Expect as _}, +}; + +#[tokio::test] +async fn messages_in_order() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let sender = fixtures::identity::create(&app, &fixtures::now()).await; + let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; + + // Call the endpoint (twice) + + let requests = vec![ + (fixtures::now(), fixtures::message::propose()), + (fixtures::now(), fixtures::message::propose()), + ]; + + for (sent_at, body) in &requests { + let request = super::Request { body: body.clone() }; + + let _ = super::handler( + State(app.clone()), + Path(conversation.id.clone()), + sent_at.clone(), + sender.clone(), + Json(request), + ) + .await + .expect("sending to a valid conversation succeeds"); + } + + // Verify the semantics + + let mut events = app + .events() + .subscribe(resume_point) + .await + .expect("subscribing always succeeds") + .filter_map(fixtures::event::stream::message) + .filter_map(fixtures::event::stream::message::sent) + .zip(stream::iter(requests)); + + while let Some((event, (sent_at, body))) = events + .next() + .expect_ready("an event should be ready for each message") + .await + { + assert_eq!(*sent_at, event.at()); + assert_eq!(sender.user.id, event.message.sender); + assert_eq!(body, event.message.body); + } +} + +#[tokio::test] +async fn nonexistent_conversation() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let sender = fixtures::identity::create(&app, &fixtures::now()).await; + + // Call the endpoint + + let sent_at = fixtures::now(); + let conversation = conversation::Id::generate(); + let request = super::Request { + body: fixtures::message::propose(), + }; + let super::Error(error) = super::handler( + State(app), + Path(conversation.clone()), + sent_at, + sender, + Json(request), + ) + .await + .expect_err("sending to a nonexistent conversation fails"); + + // Verify the structure of the response + + assert!(matches!( + error, + SendError::ConversationNotFound(error_conversation) if conversation == error_conversation + )); +} + +#[tokio::test] +async fn deleted_conversation() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let sender = fixtures::identity::create(&app, &fixtures::now()).await; + let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; + + app.conversations() + .delete(&conversation.id, &fixtures::now()) + .await + .expect("deleting a new conversation succeeds"); + + // Call the endpoint + + let sent_at = fixtures::now(); + let request = super::Request { + body: fixtures::message::propose(), + }; + let super::Error(error) = super::handler( + State(app), + Path(conversation.id.clone()), + sent_at, + sender, + Json(request), + ) + .await + .expect_err("sending to a deleted conversation fails"); + + // Verify the structure of the response + + assert!(matches!( + error, + SendError::ConversationDeleted(error_conversation) if conversation.id == error_conversation + )); +} diff --git a/src/conversation/history.rs b/src/conversation/history.rs new file mode 100644 index 0000000..601614c --- /dev/null +++ b/src/conversation/history.rs @@ -0,0 +1,69 @@ +use itertools::Itertools as _; + +use super::{ + Conversation, Id, + event::{Created, Deleted, Event}, +}; +use crate::event::{Instant, Sequence}; + +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct History { + pub conversation: Conversation, + pub deleted: Option<Instant>, +} + +// State interface +impl History { + pub fn id(&self) -> &Id { + &self.conversation.id + } + + // Snapshot of this conversation as it was when created. (Note to the future: + // it's okay if this returns a redacted or modified version of the conversation. + // If we implement renames by redacting the original name, then this should + // return the renamed conversation, not the original, even if that's not how + // it was "as created.") + pub fn as_created(&self) -> Conversation { + self.conversation.clone() + } + + pub fn as_of<S>(&self, sequence: S) -> Option<Conversation> + where + S: Into<Sequence>, + { + self.events() + .filter(Sequence::up_to(sequence.into())) + .collect() + } + + // Snapshot of this conversation as of all events recorded in this history. + pub fn as_snapshot(&self) -> Option<Conversation> { + self.events().collect() + } +} + +// Event factories +impl History { + pub fn events(&self) -> impl Iterator<Item = Event> + use<> { + [self.created()] + .into_iter() + .merge_by(self.deleted(), Sequence::merge) + } + + fn created(&self) -> Event { + Created { + conversation: self.conversation.clone(), + } + .into() + } + + fn deleted(&self) -> Option<Event> { + self.deleted.map(|instant| { + Deleted { + instant, + id: self.conversation.id.clone(), + } + .into() + }) + } +} diff --git a/src/conversation/id.rs b/src/conversation/id.rs new file mode 100644 index 0000000..5f37a59 --- /dev/null +++ b/src/conversation/id.rs @@ -0,0 +1,38 @@ +use std::fmt; + +use crate::id::Id as BaseId; + +// Stable identifier for a [Conversation]. Prefixed with `C`. +#[derive( + Clone, + Debug, + Eq, + Hash, + Ord, + PartialEq, + PartialOrd, + sqlx::Type, + serde::Deserialize, + serde::Serialize, +)] +#[sqlx(transparent)] +#[serde(transparent)] +pub struct Id(BaseId); + +impl From<BaseId> for Id { + fn from(id: BaseId) -> Self { + Self(id) + } +} + +impl Id { + pub fn generate() -> Self { + BaseId::generate("C") + } +} + +impl fmt::Display for Id { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.0.fmt(f) + } +} diff --git a/src/conversation/mod.rs b/src/conversation/mod.rs new file mode 100644 index 0000000..3dfa187 --- /dev/null +++ b/src/conversation/mod.rs @@ -0,0 +1,10 @@ +pub mod app; +pub mod event; +pub mod handlers; +mod history; +mod id; +pub mod repo; +mod snapshot; +mod validate; + +pub use self::{event::Event, history::History, id::Id, snapshot::Conversation}; diff --git a/src/conversation/repo.rs b/src/conversation/repo.rs new file mode 100644 index 0000000..82b5f01 --- /dev/null +++ b/src/conversation/repo.rs @@ -0,0 +1,332 @@ +use futures::stream::{StreamExt as _, TryStreamExt as _}; +use sqlx::{SqliteConnection, Transaction, sqlite::Sqlite}; + +use crate::{ + clock::DateTime, + conversation::{Conversation, History, Id}, + db::NotFound, + event::{Instant, Sequence}, + name::{self, Name}, +}; + +pub trait Provider { + fn conversations(&mut self) -> Conversations; +} + +impl Provider for Transaction<'_, Sqlite> { + fn conversations(&mut self) -> Conversations { + Conversations(self) + } +} + +pub struct Conversations<'t>(&'t mut SqliteConnection); + +impl Conversations<'_> { + pub async fn create(&mut self, name: &Name, created: &Instant) -> Result<History, sqlx::Error> { + let id = Id::generate(); + let name = name.clone(); + let display_name = name.display(); + let canonical_name = name.canonical(); + let created = *created; + + sqlx::query!( + r#" + insert into conversation (id, created_at, created_sequence, last_sequence) + values ($1, $2, $3, $4) + "#, + id, + created.at, + created.sequence, + created.sequence, + ) + .execute(&mut *self.0) + .await?; + + sqlx::query!( + r#" + insert into conversation_name (id, display_name, canonical_name) + values ($1, $2, $3) + "#, + id, + display_name, + canonical_name, + ) + .execute(&mut *self.0) + .await?; + + let conversation = History { + conversation: Conversation { + created, + id, + name: name.clone(), + deleted_at: None, + }, + deleted: None, + }; + + Ok(conversation) + } + + pub async fn by_id(&mut self, conversation: &Id) -> Result<History, LoadError> { + let conversation = sqlx::query!( + r#" + select + id as "id: Id", + name.display_name as "display_name?: String", + name.canonical_name as "canonical_name?: String", + conversation.created_at as "created_at: DateTime", + conversation.created_sequence as "created_sequence: Sequence", + deleted.deleted_at as "deleted_at?: DateTime", + deleted.deleted_sequence as "deleted_sequence?: Sequence" + from conversation + left join conversation_name as name + using (id) + left join conversation_deleted as deleted + using (id) + where id = $1 + "#, + conversation, + ) + .map(|row| { + Ok::<_, name::Error>(History { + conversation: Conversation { + created: Instant::new(row.created_at, row.created_sequence), + id: row.id, + name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(), + deleted_at: row.deleted_at, + }, + deleted: Instant::optional(row.deleted_at, row.deleted_sequence), + }) + }) + .fetch_one(&mut *self.0) + .await??; + + Ok(conversation) + } + + pub async fn all(&mut self, resume_at: Sequence) -> Result<Vec<History>, LoadError> { + let conversations = sqlx::query!( + r#" + select + id as "id: Id", + name.display_name as "display_name?: String", + name.canonical_name as "canonical_name?: String", + conversation.created_at as "created_at: DateTime", + conversation.created_sequence as "created_sequence: Sequence", + deleted.deleted_at as "deleted_at?: DateTime", + deleted.deleted_sequence as "deleted_sequence?: Sequence" + from conversation + left join conversation_name as name + using (id) + left join conversation_deleted as deleted + using (id) + where conversation.created_sequence <= $1 + order by name.canonical_name + "#, + resume_at, + ) + .map(|row| { + Ok::<_, name::Error>(History { + conversation: Conversation { + created: Instant::new(row.created_at, row.created_sequence), + id: row.id, + name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(), + deleted_at: row.deleted_at, + }, + deleted: Instant::optional(row.deleted_at, row.deleted_sequence), + }) + }) + .fetch(&mut *self.0) + .map(|res| Ok::<_, LoadError>(res??)) + .try_collect() + .await?; + + Ok(conversations) + } + + pub async fn replay(&mut self, resume_at: Sequence) -> Result<Vec<History>, LoadError> { + let conversations = sqlx::query!( + r#" + select + id as "id: Id", + name.display_name as "display_name?: String", + name.canonical_name as "canonical_name?: String", + conversation.created_at as "created_at: DateTime", + conversation.created_sequence as "created_sequence: Sequence", + deleted.deleted_at as "deleted_at?: DateTime", + deleted.deleted_sequence as "deleted_sequence?: Sequence" + from conversation + left join conversation_name as name + using (id) + left join conversation_deleted as deleted + using (id) + where conversation.last_sequence > $1 + "#, + resume_at, + ) + .map(|row| { + Ok::<_, name::Error>(History { + conversation: Conversation { + created: Instant::new(row.created_at, row.created_sequence), + id: row.id, + name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(), + deleted_at: row.deleted_at, + }, + deleted: Instant::optional(row.deleted_at, row.deleted_sequence), + }) + }) + .fetch(&mut *self.0) + .map(|res| Ok::<_, LoadError>(res??)) + .try_collect() + .await?; + + Ok(conversations) + } + + pub async fn delete( + &mut self, + conversation: &History, + deleted: &Instant, + ) -> Result<History, LoadError> { + let id = conversation.id(); + sqlx::query!( + r#" + update conversation + set last_sequence = max(last_sequence, $1) + where id = $2 + returning id as "id: Id" + "#, + deleted.sequence, + id, + ) + .fetch_one(&mut *self.0) + .await?; + + sqlx::query!( + r#" + insert into conversation_deleted (id, deleted_at, deleted_sequence) + values ($1, $2, $3) + "#, + id, + deleted.at, + deleted.sequence, + ) + .execute(&mut *self.0) + .await?; + + // Small social responsibility hack here: when a conversation is deleted, its + // name is retconned to have been the empty string. Someone reading the event + // stream afterwards, or looking at conversations via the API, cannot retrieve + // the "deleted" conversation's information by ignoring the deletion event. + sqlx::query!( + r#" + delete from conversation_name + where id = $1 + "#, + id, + ) + .execute(&mut *self.0) + .await?; + + let conversation = self.by_id(id).await?; + + Ok(conversation) + } + + pub async fn purge(&mut self, purge_at: &DateTime) -> Result<(), sqlx::Error> { + let conversations = sqlx::query_scalar!( + r#" + with has_messages as ( + select conversation + from message + group by conversation + ) + delete from conversation_deleted + where deleted_at < $1 + and id not in has_messages + returning id as "id: Id" + "#, + purge_at, + ) + .fetch_all(&mut *self.0) + .await?; + + for conversation in conversations { + // Wanted: a way to batch these up into one query. + sqlx::query!( + r#" + delete from conversation + where id = $1 + "#, + conversation, + ) + .execute(&mut *self.0) + .await?; + } + + Ok(()) + } + + pub async fn expired(&mut self, expired_at: &DateTime) -> Result<Vec<History>, LoadError> { + let conversations = sqlx::query!( + r#" + select + conversation.id as "id: Id", + name.display_name as "display_name?: String", + name.canonical_name as "canonical_name?: String", + conversation.created_at as "created_at: DateTime", + conversation.created_sequence as "created_sequence: Sequence", + deleted.deleted_at as "deleted_at?: DateTime", + deleted.deleted_sequence as "deleted_sequence?: Sequence" + from conversation + left join conversation_name as name + using (id) + left join conversation_deleted as deleted + using (id) + left join message + on conversation.id = message.conversation + where conversation.created_at < $1 + and message.id is null + and deleted.id is null + "#, + expired_at, + ) + .map(|row| { + Ok::<_, name::Error>(History { + conversation: Conversation { + created: Instant::new(row.created_at, row.created_sequence), + id: row.id, + name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(), + deleted_at: row.deleted_at, + }, + deleted: Instant::optional(row.deleted_at, row.deleted_sequence), + }) + }) + .fetch(&mut *self.0) + .map(|res| Ok::<_, LoadError>(res??)) + .try_collect() + .await?; + + Ok(conversations) + } +} + +#[derive(Debug, thiserror::Error)] +#[error(transparent)] +pub enum LoadError { + Database(#[from] sqlx::Error), + Name(#[from] name::Error), +} + +impl<T> NotFound for Result<T, LoadError> { + type Ok = T; + type Error = LoadError; + + fn optional(self) -> Result<Option<T>, LoadError> { + match self { + Ok(value) => Ok(Some(value)), + Err(LoadError::Database(sqlx::Error::RowNotFound)) => Ok(None), + Err(other) => Err(other), + } + } +} diff --git a/src/conversation/snapshot.rs b/src/conversation/snapshot.rs new file mode 100644 index 0000000..da9eaae --- /dev/null +++ b/src/conversation/snapshot.rs @@ -0,0 +1,43 @@ +use super::{ + Id, + event::{Created, Event}, +}; +use crate::{clock::DateTime, event::Instant, name::Name}; + +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] +pub struct Conversation { + #[serde(flatten)] + pub created: Instant, + pub id: Id, + pub name: Name, + #[serde(skip_serializing_if = "Option::is_none")] + pub deleted_at: Option<DateTime>, +} + +impl Conversation { + fn apply(state: Option<Self>, event: Event) -> Option<Self> { + match (state, event) { + (None, Event::Created(event)) => Some(event.into()), + (Some(conversation), Event::Deleted(event)) if conversation.id == event.id => None, + (state, event) => panic!("invalid conversation event {event:#?} for state {state:#?}"), + } + } +} + +impl FromIterator<Event> for Option<Conversation> { + fn from_iter<I: IntoIterator<Item = Event>>(events: I) -> Self { + events.into_iter().fold(None, Conversation::apply) + } +} + +impl From<&Created> for Conversation { + fn from(event: &Created) -> Self { + event.conversation.clone() + } +} + +impl From<Created> for Conversation { + fn from(event: Created) -> Self { + event.conversation + } +} diff --git a/src/conversation/validate.rs b/src/conversation/validate.rs new file mode 100644 index 0000000..7894e0c --- /dev/null +++ b/src/conversation/validate.rs @@ -0,0 +1,25 @@ +use std::ops::Not as _; + +use unicode_segmentation::UnicodeSegmentation as _; + +use crate::name::Name; + +// Picked out of a hat. The power of two is not meaningful. +const NAME_TOO_LONG: usize = 64; + +pub fn name(name: &Name) -> bool { + let display = name.display(); + + [ + display.graphemes(true).count() < NAME_TOO_LONG, + display.chars().any(char::is_control).not(), + display.chars().next().is_some_and(|c| !c.is_whitespace()), + display.chars().last().is_some_and(|c| !c.is_whitespace()), + display + .chars() + .zip(display.chars().skip(1)) + .all(|(a, b)| !(a.is_whitespace() && b.is_whitespace())), + ] + .into_iter() + .all(|value| value) +} |
