diff options
| author | Owen Jacobson <owen@grimoire.ca> | 2025-06-30 22:00:57 -0400 |
|---|---|---|
| committer | Owen Jacobson <owen@grimoire.ca> | 2025-07-03 22:43:42 -0400 |
| commit | a15e3d580124f561864c6a39f1e035eb1b3aab13 (patch) | |
| tree | ef80f725e7b02547a23b5c29a482fbf3fd188c0d /src/conversation | |
| parent | 5af4aea1e2f143499529b70f9cf191c6994265c6 (diff) | |
Rename "channel" to "conversation" within the server.
I've split this from the schema and API changes because, frankly, it's huge. Annoyingly so. There are no semantic changes in this, it's all symbol changes, but there are a _lot_ of them because the term "channel" leaks all over everything in a service whose primary role is managing messages sent to channels (now, conversations).
I found a buggy test while working on this! It's not fixed in this commit, because it felt mean to hide a real change in the middle of this much chaff.
Diffstat (limited to 'src/conversation')
| -rw-r--r-- | src/conversation/app.rs | 236 | ||||
| -rw-r--r-- | src/conversation/event.rs | 46 | ||||
| -rw-r--r-- | src/conversation/handlers/create/mod.rs | 67 | ||||
| -rw-r--r-- | src/conversation/handlers/create/test.rs | 250 | ||||
| -rw-r--r-- | src/conversation/handlers/delete/mod.rs | 61 | ||||
| -rw-r--r-- | src/conversation/handlers/delete/test.rs | 184 | ||||
| -rw-r--r-- | src/conversation/handlers/mod.rs | 9 | ||||
| -rw-r--r-- | src/conversation/handlers/send/mod.rs | 63 | ||||
| -rw-r--r-- | src/conversation/handlers/send/test.rs | 130 | ||||
| -rw-r--r-- | src/conversation/history.rs | 69 | ||||
| -rw-r--r-- | src/conversation/id.rs | 38 | ||||
| -rw-r--r-- | src/conversation/mod.rs | 10 | ||||
| -rw-r--r-- | src/conversation/repo.rs | 332 | ||||
| -rw-r--r-- | src/conversation/snapshot.rs | 43 | ||||
| -rw-r--r-- | src/conversation/validate.rs | 25 |
15 files changed, 1563 insertions, 0 deletions
diff --git a/src/conversation/app.rs b/src/conversation/app.rs new file mode 100644 index 0000000..81ccdcf --- /dev/null +++ b/src/conversation/app.rs @@ -0,0 +1,236 @@ +use chrono::TimeDelta; +use itertools::Itertools; +use sqlx::sqlite::SqlitePool; + +use super::{ + Conversation, Id, + repo::{LoadError, Provider as _}, + validate, +}; +use crate::{ + clock::DateTime, + db::{Duplicate as _, NotFound as _}, + event::{Broadcaster, Event, Sequence, repo::Provider as _}, + message::{self, repo::Provider as _}, + name::{self, Name}, +}; + +pub struct Conversations<'a> { + db: &'a SqlitePool, + events: &'a Broadcaster, +} + +impl<'a> Conversations<'a> { + pub const fn new(db: &'a SqlitePool, events: &'a Broadcaster) -> Self { + Self { db, events } + } + + pub async fn create( + &self, + name: &Name, + created_at: &DateTime, + ) -> Result<Conversation, CreateError> { + if !validate::name(name) { + return Err(CreateError::InvalidName(name.clone())); + } + + let mut tx = self.db.begin().await?; + let created = tx.sequence().next(created_at).await?; + let conversation = tx + .conversations() + .create(name, &created) + .await + .duplicate(|| CreateError::DuplicateName(name.clone()))?; + tx.commit().await?; + + self.events + .broadcast(conversation.events().map(Event::from).collect::<Vec<_>>()); + + Ok(conversation.as_created()) + } + + // This function is careless with respect to time, and gets you the + // conversation as it exists in the specific moment when you call it. + pub async fn get(&self, conversation: &Id) -> Result<Conversation, Error> { + let to_not_found = || Error::NotFound(conversation.clone()); + let to_deleted = || Error::Deleted(conversation.clone()); + + let mut tx = self.db.begin().await?; + let conversation = tx + .conversations() + .by_id(conversation) + .await + .not_found(to_not_found)?; + tx.commit().await?; + + conversation.as_snapshot().ok_or_else(to_deleted) + } + + pub async fn delete( + &self, + conversation: &Id, + deleted_at: &DateTime, + ) -> Result<(), DeleteError> { + let mut tx = self.db.begin().await?; + + let conversation = tx + .conversations() + .by_id(conversation) + .await + .not_found(|| DeleteError::NotFound(conversation.clone()))?; + conversation + .as_snapshot() + .ok_or_else(|| DeleteError::Deleted(conversation.id().clone()))?; + + let mut events = Vec::new(); + + let messages = tx.messages().live(&conversation).await?; + let has_messages = messages + .iter() + .map(message::History::as_snapshot) + .any(|message| message.is_some()); + if has_messages { + return Err(DeleteError::NotEmpty(conversation.id().clone())); + } + + let deleted = tx.sequence().next(deleted_at).await?; + let conversation = tx.conversations().delete(&conversation, &deleted).await?; + events.extend( + conversation + .events() + .filter(Sequence::start_from(deleted.sequence)) + .map(Event::from), + ); + + tx.commit().await?; + + self.events.broadcast(events); + + Ok(()) + } + + pub async fn expire(&self, relative_to: &DateTime) -> Result<(), ExpireError> { + // Somewhat arbitrarily, expire after 7 days. Active conversation will not be + // expired until their messages expire. + let expire_at = relative_to.to_owned() - TimeDelta::days(7); + + let mut tx = self.db.begin().await?; + let expired = tx.conversations().expired(&expire_at).await?; + + let mut events = Vec::with_capacity(expired.len()); + for conversation in expired { + let deleted = tx.sequence().next(relative_to).await?; + let conversation = tx.conversations().delete(&conversation, &deleted).await?; + events.push( + conversation + .events() + .filter(Sequence::start_from(deleted.sequence)), + ); + } + + tx.commit().await?; + + self.events.broadcast( + events + .into_iter() + .kmerge_by(Sequence::merge) + .map(Event::from) + .collect::<Vec<_>>(), + ); + + Ok(()) + } + + pub async fn purge(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { + // Somewhat arbitrarily, purge after 6 hours. + let purge_at = relative_to.to_owned() - TimeDelta::hours(6); + + let mut tx = self.db.begin().await?; + tx.conversations().purge(&purge_at).await?; + tx.commit().await?; + + Ok(()) + } +} + +#[derive(Debug, thiserror::Error)] +pub enum CreateError { + #[error("conversation named {0} already exists")] + DuplicateName(Name), + #[error("invalid conversation name: {0}")] + InvalidName(Name), + #[error(transparent)] + Database(#[from] sqlx::Error), + #[error(transparent)] + Name(#[from] name::Error), +} + +impl From<LoadError> for CreateError { + fn from(error: LoadError) -> Self { + match error { + LoadError::Database(error) => error.into(), + LoadError::Name(error) => error.into(), + } + } +} + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("conversation {0} not found")] + NotFound(Id), + #[error("conversation {0} deleted")] + Deleted(Id), + #[error(transparent)] + Database(#[from] sqlx::Error), + #[error(transparent)] + Name(#[from] name::Error), +} + +impl From<LoadError> for Error { + fn from(error: LoadError) -> Self { + match error { + LoadError::Database(error) => error.into(), + LoadError::Name(error) => error.into(), + } + } +} + +#[derive(Debug, thiserror::Error)] +pub enum DeleteError { + #[error("conversation {0} not found")] + NotFound(Id), + #[error("conversation {0} deleted")] + Deleted(Id), + #[error("conversation {0} not empty")] + NotEmpty(Id), + #[error(transparent)] + Database(#[from] sqlx::Error), + #[error(transparent)] + Name(#[from] name::Error), +} + +impl From<LoadError> for DeleteError { + fn from(error: LoadError) -> Self { + match error { + LoadError::Database(error) => error.into(), + LoadError::Name(error) => error.into(), + } + } +} + +#[derive(Debug, thiserror::Error)] +pub enum ExpireError { + #[error(transparent)] + Database(#[from] sqlx::Error), + #[error(transparent)] + Name(#[from] name::Error), +} + +impl From<LoadError> for ExpireError { + fn from(error: LoadError) -> Self { + match error { + LoadError::Database(error) => error.into(), + LoadError::Name(error) => error.into(), + } + } +} diff --git a/src/conversation/event.rs b/src/conversation/event.rs new file mode 100644 index 0000000..f5e8a81 --- /dev/null +++ b/src/conversation/event.rs @@ -0,0 +1,46 @@ +use super::Conversation; +use crate::{ + conversation, + event::{Instant, Sequenced}, +}; + +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] +#[serde(tag = "event", rename_all = "snake_case")] +pub enum Event { + Created(Created), + Deleted(Deleted), +} + +impl Sequenced for Event { + fn instant(&self) -> Instant { + match self { + Self::Created(event) => event.conversation.created, + Self::Deleted(event) => event.instant, + } + } +} + +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] +pub struct Created { + #[serde(flatten)] + pub conversation: Conversation, +} + +impl From<Created> for Event { + fn from(event: Created) -> Self { + Self::Created(event) + } +} + +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] +pub struct Deleted { + #[serde(flatten)] + pub instant: Instant, + pub id: conversation::Id, +} + +impl From<Deleted> for Event { + fn from(event: Deleted) -> Self { + Self::Deleted(event) + } +} diff --git a/src/conversation/handlers/create/mod.rs b/src/conversation/handlers/create/mod.rs new file mode 100644 index 0000000..18eca1f --- /dev/null +++ b/src/conversation/handlers/create/mod.rs @@ -0,0 +1,67 @@ +use axum::{ + extract::{Json, State}, + http::StatusCode, + response::{self, IntoResponse}, +}; + +use crate::{ + app::App, + clock::RequestedAt, + conversation::{Conversation, app}, + error::Internal, + name::Name, + token::extract::Identity, +}; + +#[cfg(test)] +mod test; + +pub async fn handler( + State(app): State<App>, + _: Identity, // requires auth, but doesn't actually care who you are + RequestedAt(created_at): RequestedAt, + Json(request): Json<Request>, +) -> Result<Response, Error> { + let conversation = app + .conversations() + .create(&request.name, &created_at) + .await + .map_err(Error)?; + + Ok(Response(conversation)) +} + +#[derive(serde::Deserialize)] +pub struct Request { + pub name: Name, +} + +#[derive(Debug)] +pub struct Response(pub Conversation); + +impl IntoResponse for Response { + fn into_response(self) -> response::Response { + let Self(conversation) = self; + (StatusCode::ACCEPTED, Json(conversation)).into_response() + } +} + +#[derive(Debug)] +pub struct Error(pub app::CreateError); + +impl IntoResponse for Error { + fn into_response(self) -> response::Response { + let Self(error) = self; + match error { + app::CreateError::DuplicateName(_) => { + (StatusCode::CONFLICT, error.to_string()).into_response() + } + app::CreateError::InvalidName(_) => { + (StatusCode::BAD_REQUEST, error.to_string()).into_response() + } + app::CreateError::Name(_) | app::CreateError::Database(_) => { + Internal::from(error).into_response() + } + } + } +} diff --git a/src/conversation/handlers/create/test.rs b/src/conversation/handlers/create/test.rs new file mode 100644 index 0000000..bc05b00 --- /dev/null +++ b/src/conversation/handlers/create/test.rs @@ -0,0 +1,250 @@ +use std::future; + +use axum::extract::{Json, State}; +use futures::stream::StreamExt as _; +use itertools::Itertools; + +use crate::{ + conversation::app, + name::Name, + test::fixtures::{self, future::Expect as _}, +}; + +#[tokio::test] +async fn new_conversation() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let creator = fixtures::identity::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; + + // Call the endpoint + + let name = fixtures::conversation::propose(); + let request = super::Request { name: name.clone() }; + let super::Response(response) = + super::handler(State(app.clone()), creator, fixtures::now(), Json(request)) + .await + .expect("creating a conversation in an empty app succeeds"); + + // Verify the structure of the response + + assert_eq!(name, response.name); + + // Verify the semantics + + let snapshot = app.boot().snapshot().await.expect("boot always succeeds"); + let created = snapshot + .events + .into_iter() + .filter_map(fixtures::event::conversation) + .filter_map(fixtures::event::conversation::created) + .exactly_one() + .expect("only one conversation has been created"); + assert_eq!(response, created.conversation); + + let conversation = app + .conversations() + .get(&response.id) + .await + .expect("the newly-created conversation exists"); + assert_eq!(response, conversation); + + let mut events = app + .events() + .subscribe(resume_point) + .await + .expect("subscribing never fails") + .filter_map(fixtures::event::stream::conversation) + .filter_map(fixtures::event::stream::conversation::created) + .filter(|event| future::ready(event.conversation == response)); + + let event = events.next().expect_some("creation event published").await; + + assert_eq!(event.conversation, response); +} + +#[tokio::test] +async fn duplicate_name() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let creator = fixtures::identity::create(&app, &fixtures::now()).await; + let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; + + // Call the endpoint + + let request = super::Request { + name: conversation.name.clone(), + }; + let super::Error(error) = + super::handler(State(app.clone()), creator, fixtures::now(), Json(request)) + .await + .expect_err("duplicate conversation name should fail the request"); + + // Verify the structure of the response + + assert!(matches!( + error, + app::CreateError::DuplicateName(name) if conversation.name == name + )); +} + +#[tokio::test] +async fn conflicting_canonical_name() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let creator = fixtures::identity::create(&app, &fixtures::now()).await; + + let existing_name = Name::from("rijksmuseum"); + app.conversations() + .create(&existing_name, &fixtures::now()) + .await + .expect("creating a conversation in an empty environment succeeds"); + + let conflicting_name = Name::from("r\u{0133}ksmuseum"); + + // Call the endpoint + + let request = super::Request { + name: conflicting_name.clone(), + }; + let super::Error(error) = + super::handler(State(app.clone()), creator, fixtures::now(), Json(request)) + .await + .expect_err("duplicate conversation name should fail the request"); + + // Verify the structure of the response + + assert!(matches!( + error, + app::CreateError::DuplicateName(name) if conflicting_name == name + )); +} + +#[tokio::test] +async fn invalid_name() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let creator = fixtures::identity::create(&app, &fixtures::now()).await; + + // Call the endpoint + + let name = fixtures::conversation::propose_invalid_name(); + let request = super::Request { name: name.clone() }; + let super::Error(error) = crate::conversation::handlers::create::handler( + State(app.clone()), + creator, + fixtures::now(), + Json(request), + ) + .await + .expect_err("invalid conversation name should fail the request"); + + // Verify the structure of the response + + assert!(matches!( + error, + app::CreateError::InvalidName(error_name) if name == error_name + )); +} + +#[tokio::test] +async fn name_reusable_after_delete() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let creator = fixtures::identity::create(&app, &fixtures::now()).await; + let name = fixtures::conversation::propose(); + + // Call the endpoint (first time) + + let request = super::Request { name: name.clone() }; + let super::Response(response) = super::handler( + State(app.clone()), + creator.clone(), + fixtures::now(), + Json(request), + ) + .await + .expect("new conversation in an empty app"); + + // Delete the conversation + + app.conversations() + .delete(&response.id, &fixtures::now()) + .await + .expect("deleting a newly-created conversation succeeds"); + + // Call the endpoint (second time) + + let request = super::Request { name: name.clone() }; + let super::Response(response) = + super::handler(State(app.clone()), creator, fixtures::now(), Json(request)) + .await + .expect("creation succeeds after original conversation deleted"); + + // Verify the structure of the response + + assert_eq!(name, response.name); + + // Verify the semantics + + let conversation = app + .conversations() + .get(&response.id) + .await + .expect("the newly-created conversation exists"); + assert_eq!(response, conversation); +} + +#[tokio::test] +async fn name_reusable_after_expiry() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let creator = fixtures::identity::create(&app, &fixtures::ancient()).await; + let name = fixtures::conversation::propose(); + + // Call the endpoint (first time) + + let request = super::Request { name: name.clone() }; + let super::Response(_) = super::handler( + State(app.clone()), + creator.clone(), + fixtures::ancient(), + Json(request), + ) + .await + .expect("new conversation in an empty app"); + + // Expire the conversation + + app.conversations() + .expire(&fixtures::now()) + .await + .expect("expiry always succeeds"); + + // Call the endpoint (second time) + + let request = super::Request { name: name.clone() }; + let super::Response(response) = + super::handler(State(app.clone()), creator, fixtures::now(), Json(request)) + .await + .expect("creation succeeds after original conversation expired"); + + // Verify the structure of the response + + assert_eq!(name, response.name); + + // Verify the semantics + + let conversation = app + .conversations() + .get(&response.id) + .await + .expect("the newly-created conversation exists"); + assert_eq!(response, conversation); +} diff --git a/src/conversation/handlers/delete/mod.rs b/src/conversation/handlers/delete/mod.rs new file mode 100644 index 0000000..272165a --- /dev/null +++ b/src/conversation/handlers/delete/mod.rs @@ -0,0 +1,61 @@ +use axum::{ + extract::{Json, Path, State}, + http::StatusCode, + response::{self, IntoResponse}, +}; + +use crate::{ + app::App, + clock::RequestedAt, + conversation::{self, app, handlers::PathInfo}, + error::{Internal, NotFound}, + token::extract::Identity, +}; + +#[cfg(test)] +mod test; + +pub async fn handler( + State(app): State<App>, + Path(conversation): Path<PathInfo>, + RequestedAt(deleted_at): RequestedAt, + _: Identity, +) -> Result<Response, Error> { + app.conversations() + .delete(&conversation, &deleted_at) + .await?; + + Ok(Response { id: conversation }) +} + +#[derive(Debug, serde::Serialize)] +pub struct Response { + pub id: conversation::Id, +} + +impl IntoResponse for Response { + fn into_response(self) -> response::Response { + (StatusCode::ACCEPTED, Json(self)).into_response() + } +} + +#[derive(Debug, thiserror::Error)] +#[error(transparent)] +pub struct Error(#[from] pub app::DeleteError); + +impl IntoResponse for Error { + fn into_response(self) -> response::Response { + let Self(error) = self; + match error { + app::DeleteError::NotFound(_) | app::DeleteError::Deleted(_) => { + NotFound(error).into_response() + } + app::DeleteError::NotEmpty(_) => { + (StatusCode::CONFLICT, error.to_string()).into_response() + } + app::DeleteError::Name(_) | app::DeleteError::Database(_) => { + Internal::from(error).into_response() + } + } + } +} diff --git a/src/conversation/handlers/delete/test.rs b/src/conversation/handlers/delete/test.rs new file mode 100644 index 0000000..2718d3b --- /dev/null +++ b/src/conversation/handlers/delete/test.rs @@ -0,0 +1,184 @@ +use axum::extract::{Path, State}; +use itertools::Itertools; + +use crate::{conversation::app, test::fixtures}; + +#[tokio::test] +pub async fn valid_conversation() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; + + // Send the request + + let deleter = fixtures::identity::create(&app, &fixtures::now()).await; + let response = super::handler( + State(app.clone()), + Path(conversation.id.clone()), + fixtures::now(), + deleter, + ) + .await + .expect("deleting a valid conversation succeeds"); + + // Verify the response + + assert_eq!(conversation.id, response.id); + + // Verify the semantics + + let snapshot = app.boot().snapshot().await.expect("boot always succeeds"); + let created = snapshot + .events + .into_iter() + .filter_map(fixtures::event::conversation) + .filter_map(fixtures::event::conversation::created) + .exactly_one() + .expect("only one conversation has been created"); + // We don't expect `conversation` to match the event exactly, as the name will have + // been tombstoned and the conversation given a `deleted_at` date. + assert_eq!(conversation.id, created.conversation.id); +} + +#[tokio::test] +pub async fn invalid_conversation_id() { + // Set up the environment + + let app = fixtures::scratch_app().await; + + // Send the request + + let deleter = fixtures::identity::create(&app, &fixtures::now()).await; + let conversation = fixtures::conversation::fictitious(); + let super::Error(error) = super::handler( + State(app.clone()), + Path(conversation.clone()), + fixtures::now(), + deleter, + ) + .await + .expect_err("deleting a nonexistent conversation fails"); + + // Verify the response + + assert!(matches!(error, app::DeleteError::NotFound(id) if id == conversation)); +} + +#[tokio::test] +pub async fn conversation_deleted() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; + + app.conversations() + .delete(&conversation.id, &fixtures::now()) + .await + .expect("deleting a recently-created conversation succeeds"); + + // Send the request + + let deleter = fixtures::identity::create(&app, &fixtures::now()).await; + let super::Error(error) = super::handler( + State(app.clone()), + Path(conversation.id.clone()), + fixtures::now(), + deleter, + ) + .await + .expect_err("deleting a deleted conversation fails"); + + // Verify the response + + assert!(matches!(error, app::DeleteError::Deleted(id) if id == conversation.id)); +} + +#[tokio::test] +pub async fn conversation_expired() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let conversation = fixtures::conversation::create(&app, &fixtures::ancient()).await; + + app.conversations() + .expire(&fixtures::now()) + .await + .expect("expiring conversations always succeeds"); + + // Send the request + + let deleter = fixtures::identity::create(&app, &fixtures::now()).await; + let super::Error(error) = super::handler( + State(app.clone()), + Path(conversation.id.clone()), + fixtures::now(), + deleter, + ) + .await + .expect_err("deleting an expired conversation fails"); + + // Verify the response + + assert!(matches!(error, app::DeleteError::Deleted(id) if id == conversation.id)); +} + +#[tokio::test] +pub async fn conversation_purged() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let conversation = fixtures::conversation::create(&app, &fixtures::ancient()).await; + + app.conversations() + .expire(&fixtures::old()) + .await + .expect("expiring conversations always succeeds"); + + app.conversations() + .purge(&fixtures::now()) + .await + .expect("purging conversations always succeeds"); + + // Send the request + + let deleter = fixtures::identity::create(&app, &fixtures::now()).await; + let super::Error(error) = super::handler( + State(app.clone()), + Path(conversation.id.clone()), + fixtures::now(), + deleter, + ) + .await + .expect_err("deleting a purged conversation fails"); + + // Verify the response + + assert!(matches!(error, app::DeleteError::NotFound(id) if id == conversation.id)); +} + +#[tokio::test] +pub async fn conversation_not_empty() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; + let sender = fixtures::user::create(&app, &fixtures::now()).await; + fixtures::message::send(&app, &conversation, &sender, &fixtures::now()).await; + + // Send the request + + let deleter = fixtures::identity::create(&app, &fixtures::now()).await; + let super::Error(error) = super::handler( + State(app.clone()), + Path(conversation.id.clone()), + fixtures::now(), + deleter, + ) + .await + .expect_err("deleting a conversation with messages fails"); + + // Verify the response + + assert!(matches!(error, app::DeleteError::NotEmpty(id) if id == conversation.id)); +} diff --git a/src/conversation/handlers/mod.rs b/src/conversation/handlers/mod.rs new file mode 100644 index 0000000..2fe727c --- /dev/null +++ b/src/conversation/handlers/mod.rs @@ -0,0 +1,9 @@ +mod create; +mod delete; +mod send; + +pub use create::handler as create; +pub use delete::handler as delete; +pub use send::handler as send; + +type PathInfo = crate::conversation::Id; diff --git a/src/conversation/handlers/send/mod.rs b/src/conversation/handlers/send/mod.rs new file mode 100644 index 0000000..9ec020a --- /dev/null +++ b/src/conversation/handlers/send/mod.rs @@ -0,0 +1,63 @@ +use axum::{ + extract::{Json, Path, State}, + http::StatusCode, + response::{self, IntoResponse}, +}; + +use crate::conversation::handlers::PathInfo; +use crate::{ + app::App, + clock::RequestedAt, + error::{Internal, NotFound}, + message::{Body, Message, app::SendError}, + token::extract::Identity, +}; + +#[cfg(test)] +mod test; + +pub async fn handler( + State(app): State<App>, + Path(conversation): Path<PathInfo>, + RequestedAt(sent_at): RequestedAt, + identity: Identity, + Json(request): Json<Request>, +) -> Result<Response, Error> { + let message = app + .messages() + .send(&conversation, &identity.user, &sent_at, &request.body) + .await?; + + Ok(Response(message)) +} + +#[derive(serde::Deserialize)] +pub struct Request { + pub body: Body, +} + +#[derive(Debug)] +pub struct Response(pub Message); + +impl IntoResponse for Response { + fn into_response(self) -> response::Response { + let Self(message) = self; + (StatusCode::ACCEPTED, Json(message)).into_response() + } +} + +#[derive(Debug, thiserror::Error)] +#[error(transparent)] +pub struct Error(#[from] pub SendError); + +impl IntoResponse for Error { + fn into_response(self) -> response::Response { + let Self(error) = self; + match error { + SendError::ConversationNotFound(_) | SendError::ConversationDeleted(_) => { + NotFound(error).into_response() + } + SendError::Name(_) | SendError::Database(_) => Internal::from(error).into_response(), + } + } +} diff --git a/src/conversation/handlers/send/test.rs b/src/conversation/handlers/send/test.rs new file mode 100644 index 0000000..bd32510 --- /dev/null +++ b/src/conversation/handlers/send/test.rs @@ -0,0 +1,130 @@ +use axum::extract::{Json, Path, State}; +use futures::stream::{self, StreamExt as _}; + +use crate::{ + conversation, + event::Sequenced, + message::app::SendError, + test::fixtures::{self, future::Expect as _}, +}; + +#[tokio::test] +async fn messages_in_order() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let sender = fixtures::identity::create(&app, &fixtures::now()).await; + let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; + + // Call the endpoint (twice) + + let requests = vec![ + (fixtures::now(), fixtures::message::propose()), + (fixtures::now(), fixtures::message::propose()), + ]; + + for (sent_at, body) in &requests { + let request = super::Request { body: body.clone() }; + + let _ = super::handler( + State(app.clone()), + Path(conversation.id.clone()), + sent_at.clone(), + sender.clone(), + Json(request), + ) + .await + .expect("sending to a valid conversation succeeds"); + } + + // Verify the semantics + + let mut events = app + .events() + .subscribe(resume_point) + .await + .expect("subscribing always succeeds") + .filter_map(fixtures::event::stream::message) + .filter_map(fixtures::event::stream::message::sent) + .zip(stream::iter(requests)); + + while let Some((event, (sent_at, body))) = events + .next() + .expect_ready("an event should be ready for each message") + .await + { + assert_eq!(*sent_at, event.at()); + assert_eq!(sender.user.id, event.message.sender); + assert_eq!(body, event.message.body); + } +} + +#[tokio::test] +async fn nonexistent_conversation() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let sender = fixtures::identity::create(&app, &fixtures::now()).await; + + // Call the endpoint + + let sent_at = fixtures::now(); + let conversation = conversation::Id::generate(); + let request = super::Request { + body: fixtures::message::propose(), + }; + let super::Error(error) = super::handler( + State(app), + Path(conversation.clone()), + sent_at, + sender, + Json(request), + ) + .await + .expect_err("sending to a nonexistent conversation fails"); + + // Verify the structure of the response + + assert!(matches!( + error, + SendError::ConversationNotFound(error_conversation) if conversation == error_conversation + )); +} + +#[tokio::test] +async fn deleted_conversation() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let sender = fixtures::identity::create(&app, &fixtures::now()).await; + let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; + + app.conversations() + .delete(&conversation.id, &fixtures::now()) + .await + .expect("deleting a new conversation succeeds"); + + // Call the endpoint + + let sent_at = fixtures::now(); + let request = super::Request { + body: fixtures::message::propose(), + }; + let super::Error(error) = super::handler( + State(app), + Path(conversation.id.clone()), + sent_at, + sender, + Json(request), + ) + .await + .expect_err("sending to a deleted conversation fails"); + + // Verify the structure of the response + + assert!(matches!( + error, + SendError::ConversationDeleted(error_conversation) if conversation.id == error_conversation + )); +} diff --git a/src/conversation/history.rs b/src/conversation/history.rs new file mode 100644 index 0000000..601614c --- /dev/null +++ b/src/conversation/history.rs @@ -0,0 +1,69 @@ +use itertools::Itertools as _; + +use super::{ + Conversation, Id, + event::{Created, Deleted, Event}, +}; +use crate::event::{Instant, Sequence}; + +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct History { + pub conversation: Conversation, + pub deleted: Option<Instant>, +} + +// State interface +impl History { + pub fn id(&self) -> &Id { + &self.conversation.id + } + + // Snapshot of this conversation as it was when created. (Note to the future: + // it's okay if this returns a redacted or modified version of the conversation. + // If we implement renames by redacting the original name, then this should + // return the renamed conversation, not the original, even if that's not how + // it was "as created.") + pub fn as_created(&self) -> Conversation { + self.conversation.clone() + } + + pub fn as_of<S>(&self, sequence: S) -> Option<Conversation> + where + S: Into<Sequence>, + { + self.events() + .filter(Sequence::up_to(sequence.into())) + .collect() + } + + // Snapshot of this conversation as of all events recorded in this history. + pub fn as_snapshot(&self) -> Option<Conversation> { + self.events().collect() + } +} + +// Event factories +impl History { + pub fn events(&self) -> impl Iterator<Item = Event> + use<> { + [self.created()] + .into_iter() + .merge_by(self.deleted(), Sequence::merge) + } + + fn created(&self) -> Event { + Created { + conversation: self.conversation.clone(), + } + .into() + } + + fn deleted(&self) -> Option<Event> { + self.deleted.map(|instant| { + Deleted { + instant, + id: self.conversation.id.clone(), + } + .into() + }) + } +} diff --git a/src/conversation/id.rs b/src/conversation/id.rs new file mode 100644 index 0000000..5f37a59 --- /dev/null +++ b/src/conversation/id.rs @@ -0,0 +1,38 @@ +use std::fmt; + +use crate::id::Id as BaseId; + +// Stable identifier for a [Conversation]. Prefixed with `C`. +#[derive( + Clone, + Debug, + Eq, + Hash, + Ord, + PartialEq, + PartialOrd, + sqlx::Type, + serde::Deserialize, + serde::Serialize, +)] +#[sqlx(transparent)] +#[serde(transparent)] +pub struct Id(BaseId); + +impl From<BaseId> for Id { + fn from(id: BaseId) -> Self { + Self(id) + } +} + +impl Id { + pub fn generate() -> Self { + BaseId::generate("C") + } +} + +impl fmt::Display for Id { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.0.fmt(f) + } +} diff --git a/src/conversation/mod.rs b/src/conversation/mod.rs new file mode 100644 index 0000000..3dfa187 --- /dev/null +++ b/src/conversation/mod.rs @@ -0,0 +1,10 @@ +pub mod app; +pub mod event; +pub mod handlers; +mod history; +mod id; +pub mod repo; +mod snapshot; +mod validate; + +pub use self::{event::Event, history::History, id::Id, snapshot::Conversation}; diff --git a/src/conversation/repo.rs b/src/conversation/repo.rs new file mode 100644 index 0000000..82b5f01 --- /dev/null +++ b/src/conversation/repo.rs @@ -0,0 +1,332 @@ +use futures::stream::{StreamExt as _, TryStreamExt as _}; +use sqlx::{SqliteConnection, Transaction, sqlite::Sqlite}; + +use crate::{ + clock::DateTime, + conversation::{Conversation, History, Id}, + db::NotFound, + event::{Instant, Sequence}, + name::{self, Name}, +}; + +pub trait Provider { + fn conversations(&mut self) -> Conversations; +} + +impl Provider for Transaction<'_, Sqlite> { + fn conversations(&mut self) -> Conversations { + Conversations(self) + } +} + +pub struct Conversations<'t>(&'t mut SqliteConnection); + +impl Conversations<'_> { + pub async fn create(&mut self, name: &Name, created: &Instant) -> Result<History, sqlx::Error> { + let id = Id::generate(); + let name = name.clone(); + let display_name = name.display(); + let canonical_name = name.canonical(); + let created = *created; + + sqlx::query!( + r#" + insert into conversation (id, created_at, created_sequence, last_sequence) + values ($1, $2, $3, $4) + "#, + id, + created.at, + created.sequence, + created.sequence, + ) + .execute(&mut *self.0) + .await?; + + sqlx::query!( + r#" + insert into conversation_name (id, display_name, canonical_name) + values ($1, $2, $3) + "#, + id, + display_name, + canonical_name, + ) + .execute(&mut *self.0) + .await?; + + let conversation = History { + conversation: Conversation { + created, + id, + name: name.clone(), + deleted_at: None, + }, + deleted: None, + }; + + Ok(conversation) + } + + pub async fn by_id(&mut self, conversation: &Id) -> Result<History, LoadError> { + let conversation = sqlx::query!( + r#" + select + id as "id: Id", + name.display_name as "display_name?: String", + name.canonical_name as "canonical_name?: String", + conversation.created_at as "created_at: DateTime", + conversation.created_sequence as "created_sequence: Sequence", + deleted.deleted_at as "deleted_at?: DateTime", + deleted.deleted_sequence as "deleted_sequence?: Sequence" + from conversation + left join conversation_name as name + using (id) + left join conversation_deleted as deleted + using (id) + where id = $1 + "#, + conversation, + ) + .map(|row| { + Ok::<_, name::Error>(History { + conversation: Conversation { + created: Instant::new(row.created_at, row.created_sequence), + id: row.id, + name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(), + deleted_at: row.deleted_at, + }, + deleted: Instant::optional(row.deleted_at, row.deleted_sequence), + }) + }) + .fetch_one(&mut *self.0) + .await??; + + Ok(conversation) + } + + pub async fn all(&mut self, resume_at: Sequence) -> Result<Vec<History>, LoadError> { + let conversations = sqlx::query!( + r#" + select + id as "id: Id", + name.display_name as "display_name?: String", + name.canonical_name as "canonical_name?: String", + conversation.created_at as "created_at: DateTime", + conversation.created_sequence as "created_sequence: Sequence", + deleted.deleted_at as "deleted_at?: DateTime", + deleted.deleted_sequence as "deleted_sequence?: Sequence" + from conversation + left join conversation_name as name + using (id) + left join conversation_deleted as deleted + using (id) + where conversation.created_sequence <= $1 + order by name.canonical_name + "#, + resume_at, + ) + .map(|row| { + Ok::<_, name::Error>(History { + conversation: Conversation { + created: Instant::new(row.created_at, row.created_sequence), + id: row.id, + name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(), + deleted_at: row.deleted_at, + }, + deleted: Instant::optional(row.deleted_at, row.deleted_sequence), + }) + }) + .fetch(&mut *self.0) + .map(|res| Ok::<_, LoadError>(res??)) + .try_collect() + .await?; + + Ok(conversations) + } + + pub async fn replay(&mut self, resume_at: Sequence) -> Result<Vec<History>, LoadError> { + let conversations = sqlx::query!( + r#" + select + id as "id: Id", + name.display_name as "display_name?: String", + name.canonical_name as "canonical_name?: String", + conversation.created_at as "created_at: DateTime", + conversation.created_sequence as "created_sequence: Sequence", + deleted.deleted_at as "deleted_at?: DateTime", + deleted.deleted_sequence as "deleted_sequence?: Sequence" + from conversation + left join conversation_name as name + using (id) + left join conversation_deleted as deleted + using (id) + where conversation.last_sequence > $1 + "#, + resume_at, + ) + .map(|row| { + Ok::<_, name::Error>(History { + conversation: Conversation { + created: Instant::new(row.created_at, row.created_sequence), + id: row.id, + name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(), + deleted_at: row.deleted_at, + }, + deleted: Instant::optional(row.deleted_at, row.deleted_sequence), + }) + }) + .fetch(&mut *self.0) + .map(|res| Ok::<_, LoadError>(res??)) + .try_collect() + .await?; + + Ok(conversations) + } + + pub async fn delete( + &mut self, + conversation: &History, + deleted: &Instant, + ) -> Result<History, LoadError> { + let id = conversation.id(); + sqlx::query!( + r#" + update conversation + set last_sequence = max(last_sequence, $1) + where id = $2 + returning id as "id: Id" + "#, + deleted.sequence, + id, + ) + .fetch_one(&mut *self.0) + .await?; + + sqlx::query!( + r#" + insert into conversation_deleted (id, deleted_at, deleted_sequence) + values ($1, $2, $3) + "#, + id, + deleted.at, + deleted.sequence, + ) + .execute(&mut *self.0) + .await?; + + // Small social responsibility hack here: when a conversation is deleted, its + // name is retconned to have been the empty string. Someone reading the event + // stream afterwards, or looking at conversations via the API, cannot retrieve + // the "deleted" conversation's information by ignoring the deletion event. + sqlx::query!( + r#" + delete from conversation_name + where id = $1 + "#, + id, + ) + .execute(&mut *self.0) + .await?; + + let conversation = self.by_id(id).await?; + + Ok(conversation) + } + + pub async fn purge(&mut self, purge_at: &DateTime) -> Result<(), sqlx::Error> { + let conversations = sqlx::query_scalar!( + r#" + with has_messages as ( + select conversation + from message + group by conversation + ) + delete from conversation_deleted + where deleted_at < $1 + and id not in has_messages + returning id as "id: Id" + "#, + purge_at, + ) + .fetch_all(&mut *self.0) + .await?; + + for conversation in conversations { + // Wanted: a way to batch these up into one query. + sqlx::query!( + r#" + delete from conversation + where id = $1 + "#, + conversation, + ) + .execute(&mut *self.0) + .await?; + } + + Ok(()) + } + + pub async fn expired(&mut self, expired_at: &DateTime) -> Result<Vec<History>, LoadError> { + let conversations = sqlx::query!( + r#" + select + conversation.id as "id: Id", + name.display_name as "display_name?: String", + name.canonical_name as "canonical_name?: String", + conversation.created_at as "created_at: DateTime", + conversation.created_sequence as "created_sequence: Sequence", + deleted.deleted_at as "deleted_at?: DateTime", + deleted.deleted_sequence as "deleted_sequence?: Sequence" + from conversation + left join conversation_name as name + using (id) + left join conversation_deleted as deleted + using (id) + left join message + on conversation.id = message.conversation + where conversation.created_at < $1 + and message.id is null + and deleted.id is null + "#, + expired_at, + ) + .map(|row| { + Ok::<_, name::Error>(History { + conversation: Conversation { + created: Instant::new(row.created_at, row.created_sequence), + id: row.id, + name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(), + deleted_at: row.deleted_at, + }, + deleted: Instant::optional(row.deleted_at, row.deleted_sequence), + }) + }) + .fetch(&mut *self.0) + .map(|res| Ok::<_, LoadError>(res??)) + .try_collect() + .await?; + + Ok(conversations) + } +} + +#[derive(Debug, thiserror::Error)] +#[error(transparent)] +pub enum LoadError { + Database(#[from] sqlx::Error), + Name(#[from] name::Error), +} + +impl<T> NotFound for Result<T, LoadError> { + type Ok = T; + type Error = LoadError; + + fn optional(self) -> Result<Option<T>, LoadError> { + match self { + Ok(value) => Ok(Some(value)), + Err(LoadError::Database(sqlx::Error::RowNotFound)) => Ok(None), + Err(other) => Err(other), + } + } +} diff --git a/src/conversation/snapshot.rs b/src/conversation/snapshot.rs new file mode 100644 index 0000000..da9eaae --- /dev/null +++ b/src/conversation/snapshot.rs @@ -0,0 +1,43 @@ +use super::{ + Id, + event::{Created, Event}, +}; +use crate::{clock::DateTime, event::Instant, name::Name}; + +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] +pub struct Conversation { + #[serde(flatten)] + pub created: Instant, + pub id: Id, + pub name: Name, + #[serde(skip_serializing_if = "Option::is_none")] + pub deleted_at: Option<DateTime>, +} + +impl Conversation { + fn apply(state: Option<Self>, event: Event) -> Option<Self> { + match (state, event) { + (None, Event::Created(event)) => Some(event.into()), + (Some(conversation), Event::Deleted(event)) if conversation.id == event.id => None, + (state, event) => panic!("invalid conversation event {event:#?} for state {state:#?}"), + } + } +} + +impl FromIterator<Event> for Option<Conversation> { + fn from_iter<I: IntoIterator<Item = Event>>(events: I) -> Self { + events.into_iter().fold(None, Conversation::apply) + } +} + +impl From<&Created> for Conversation { + fn from(event: &Created) -> Self { + event.conversation.clone() + } +} + +impl From<Created> for Conversation { + fn from(event: Created) -> Self { + event.conversation + } +} diff --git a/src/conversation/validate.rs b/src/conversation/validate.rs new file mode 100644 index 0000000..7894e0c --- /dev/null +++ b/src/conversation/validate.rs @@ -0,0 +1,25 @@ +use std::ops::Not as _; + +use unicode_segmentation::UnicodeSegmentation as _; + +use crate::name::Name; + +// Picked out of a hat. The power of two is not meaningful. +const NAME_TOO_LONG: usize = 64; + +pub fn name(name: &Name) -> bool { + let display = name.display(); + + [ + display.graphemes(true).count() < NAME_TOO_LONG, + display.chars().any(char::is_control).not(), + display.chars().next().is_some_and(|c| !c.is_whitespace()), + display.chars().last().is_some_and(|c| !c.is_whitespace()), + display + .chars() + .zip(display.chars().skip(1)) + .all(|(a, b)| !(a.is_whitespace() && b.is_whitespace())), + ] + .into_iter() + .all(|value| value) +} |
