summaryrefslogtreecommitdiff
path: root/src/conversation
diff options
context:
space:
mode:
Diffstat (limited to 'src/conversation')
-rw-r--r--src/conversation/app.rs236
-rw-r--r--src/conversation/event.rs46
-rw-r--r--src/conversation/handlers/create/mod.rs67
-rw-r--r--src/conversation/handlers/create/test.rs250
-rw-r--r--src/conversation/handlers/delete/mod.rs61
-rw-r--r--src/conversation/handlers/delete/test.rs184
-rw-r--r--src/conversation/handlers/mod.rs9
-rw-r--r--src/conversation/handlers/send/mod.rs63
-rw-r--r--src/conversation/handlers/send/test.rs130
-rw-r--r--src/conversation/history.rs69
-rw-r--r--src/conversation/id.rs38
-rw-r--r--src/conversation/mod.rs10
-rw-r--r--src/conversation/repo.rs332
-rw-r--r--src/conversation/snapshot.rs43
-rw-r--r--src/conversation/validate.rs25
15 files changed, 1563 insertions, 0 deletions
diff --git a/src/conversation/app.rs b/src/conversation/app.rs
new file mode 100644
index 0000000..81ccdcf
--- /dev/null
+++ b/src/conversation/app.rs
@@ -0,0 +1,236 @@
+use chrono::TimeDelta;
+use itertools::Itertools;
+use sqlx::sqlite::SqlitePool;
+
+use super::{
+ Conversation, Id,
+ repo::{LoadError, Provider as _},
+ validate,
+};
+use crate::{
+ clock::DateTime,
+ db::{Duplicate as _, NotFound as _},
+ event::{Broadcaster, Event, Sequence, repo::Provider as _},
+ message::{self, repo::Provider as _},
+ name::{self, Name},
+};
+
+pub struct Conversations<'a> {
+ db: &'a SqlitePool,
+ events: &'a Broadcaster,
+}
+
+impl<'a> Conversations<'a> {
+ pub const fn new(db: &'a SqlitePool, events: &'a Broadcaster) -> Self {
+ Self { db, events }
+ }
+
+ pub async fn create(
+ &self,
+ name: &Name,
+ created_at: &DateTime,
+ ) -> Result<Conversation, CreateError> {
+ if !validate::name(name) {
+ return Err(CreateError::InvalidName(name.clone()));
+ }
+
+ let mut tx = self.db.begin().await?;
+ let created = tx.sequence().next(created_at).await?;
+ let conversation = tx
+ .conversations()
+ .create(name, &created)
+ .await
+ .duplicate(|| CreateError::DuplicateName(name.clone()))?;
+ tx.commit().await?;
+
+ self.events
+ .broadcast(conversation.events().map(Event::from).collect::<Vec<_>>());
+
+ Ok(conversation.as_created())
+ }
+
+ // This function is careless with respect to time, and gets you the
+ // conversation as it exists in the specific moment when you call it.
+ pub async fn get(&self, conversation: &Id) -> Result<Conversation, Error> {
+ let to_not_found = || Error::NotFound(conversation.clone());
+ let to_deleted = || Error::Deleted(conversation.clone());
+
+ let mut tx = self.db.begin().await?;
+ let conversation = tx
+ .conversations()
+ .by_id(conversation)
+ .await
+ .not_found(to_not_found)?;
+ tx.commit().await?;
+
+ conversation.as_snapshot().ok_or_else(to_deleted)
+ }
+
+ pub async fn delete(
+ &self,
+ conversation: &Id,
+ deleted_at: &DateTime,
+ ) -> Result<(), DeleteError> {
+ let mut tx = self.db.begin().await?;
+
+ let conversation = tx
+ .conversations()
+ .by_id(conversation)
+ .await
+ .not_found(|| DeleteError::NotFound(conversation.clone()))?;
+ conversation
+ .as_snapshot()
+ .ok_or_else(|| DeleteError::Deleted(conversation.id().clone()))?;
+
+ let mut events = Vec::new();
+
+ let messages = tx.messages().live(&conversation).await?;
+ let has_messages = messages
+ .iter()
+ .map(message::History::as_snapshot)
+ .any(|message| message.is_some());
+ if has_messages {
+ return Err(DeleteError::NotEmpty(conversation.id().clone()));
+ }
+
+ let deleted = tx.sequence().next(deleted_at).await?;
+ let conversation = tx.conversations().delete(&conversation, &deleted).await?;
+ events.extend(
+ conversation
+ .events()
+ .filter(Sequence::start_from(deleted.sequence))
+ .map(Event::from),
+ );
+
+ tx.commit().await?;
+
+ self.events.broadcast(events);
+
+ Ok(())
+ }
+
+ pub async fn expire(&self, relative_to: &DateTime) -> Result<(), ExpireError> {
+ // Somewhat arbitrarily, expire after 7 days. Active conversation will not be
+ // expired until their messages expire.
+ let expire_at = relative_to.to_owned() - TimeDelta::days(7);
+
+ let mut tx = self.db.begin().await?;
+ let expired = tx.conversations().expired(&expire_at).await?;
+
+ let mut events = Vec::with_capacity(expired.len());
+ for conversation in expired {
+ let deleted = tx.sequence().next(relative_to).await?;
+ let conversation = tx.conversations().delete(&conversation, &deleted).await?;
+ events.push(
+ conversation
+ .events()
+ .filter(Sequence::start_from(deleted.sequence)),
+ );
+ }
+
+ tx.commit().await?;
+
+ self.events.broadcast(
+ events
+ .into_iter()
+ .kmerge_by(Sequence::merge)
+ .map(Event::from)
+ .collect::<Vec<_>>(),
+ );
+
+ Ok(())
+ }
+
+ pub async fn purge(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> {
+ // Somewhat arbitrarily, purge after 6 hours.
+ let purge_at = relative_to.to_owned() - TimeDelta::hours(6);
+
+ let mut tx = self.db.begin().await?;
+ tx.conversations().purge(&purge_at).await?;
+ tx.commit().await?;
+
+ Ok(())
+ }
+}
+
+#[derive(Debug, thiserror::Error)]
+pub enum CreateError {
+ #[error("conversation named {0} already exists")]
+ DuplicateName(Name),
+ #[error("invalid conversation name: {0}")]
+ InvalidName(Name),
+ #[error(transparent)]
+ Database(#[from] sqlx::Error),
+ #[error(transparent)]
+ Name(#[from] name::Error),
+}
+
+impl From<LoadError> for CreateError {
+ fn from(error: LoadError) -> Self {
+ match error {
+ LoadError::Database(error) => error.into(),
+ LoadError::Name(error) => error.into(),
+ }
+ }
+}
+
+#[derive(Debug, thiserror::Error)]
+pub enum Error {
+ #[error("conversation {0} not found")]
+ NotFound(Id),
+ #[error("conversation {0} deleted")]
+ Deleted(Id),
+ #[error(transparent)]
+ Database(#[from] sqlx::Error),
+ #[error(transparent)]
+ Name(#[from] name::Error),
+}
+
+impl From<LoadError> for Error {
+ fn from(error: LoadError) -> Self {
+ match error {
+ LoadError::Database(error) => error.into(),
+ LoadError::Name(error) => error.into(),
+ }
+ }
+}
+
+#[derive(Debug, thiserror::Error)]
+pub enum DeleteError {
+ #[error("conversation {0} not found")]
+ NotFound(Id),
+ #[error("conversation {0} deleted")]
+ Deleted(Id),
+ #[error("conversation {0} not empty")]
+ NotEmpty(Id),
+ #[error(transparent)]
+ Database(#[from] sqlx::Error),
+ #[error(transparent)]
+ Name(#[from] name::Error),
+}
+
+impl From<LoadError> for DeleteError {
+ fn from(error: LoadError) -> Self {
+ match error {
+ LoadError::Database(error) => error.into(),
+ LoadError::Name(error) => error.into(),
+ }
+ }
+}
+
+#[derive(Debug, thiserror::Error)]
+pub enum ExpireError {
+ #[error(transparent)]
+ Database(#[from] sqlx::Error),
+ #[error(transparent)]
+ Name(#[from] name::Error),
+}
+
+impl From<LoadError> for ExpireError {
+ fn from(error: LoadError) -> Self {
+ match error {
+ LoadError::Database(error) => error.into(),
+ LoadError::Name(error) => error.into(),
+ }
+ }
+}
diff --git a/src/conversation/event.rs b/src/conversation/event.rs
new file mode 100644
index 0000000..f5e8a81
--- /dev/null
+++ b/src/conversation/event.rs
@@ -0,0 +1,46 @@
+use super::Conversation;
+use crate::{
+ conversation,
+ event::{Instant, Sequenced},
+};
+
+#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
+#[serde(tag = "event", rename_all = "snake_case")]
+pub enum Event {
+ Created(Created),
+ Deleted(Deleted),
+}
+
+impl Sequenced for Event {
+ fn instant(&self) -> Instant {
+ match self {
+ Self::Created(event) => event.conversation.created,
+ Self::Deleted(event) => event.instant,
+ }
+ }
+}
+
+#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
+pub struct Created {
+ #[serde(flatten)]
+ pub conversation: Conversation,
+}
+
+impl From<Created> for Event {
+ fn from(event: Created) -> Self {
+ Self::Created(event)
+ }
+}
+
+#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
+pub struct Deleted {
+ #[serde(flatten)]
+ pub instant: Instant,
+ pub id: conversation::Id,
+}
+
+impl From<Deleted> for Event {
+ fn from(event: Deleted) -> Self {
+ Self::Deleted(event)
+ }
+}
diff --git a/src/conversation/handlers/create/mod.rs b/src/conversation/handlers/create/mod.rs
new file mode 100644
index 0000000..18eca1f
--- /dev/null
+++ b/src/conversation/handlers/create/mod.rs
@@ -0,0 +1,67 @@
+use axum::{
+ extract::{Json, State},
+ http::StatusCode,
+ response::{self, IntoResponse},
+};
+
+use crate::{
+ app::App,
+ clock::RequestedAt,
+ conversation::{Conversation, app},
+ error::Internal,
+ name::Name,
+ token::extract::Identity,
+};
+
+#[cfg(test)]
+mod test;
+
+pub async fn handler(
+ State(app): State<App>,
+ _: Identity, // requires auth, but doesn't actually care who you are
+ RequestedAt(created_at): RequestedAt,
+ Json(request): Json<Request>,
+) -> Result<Response, Error> {
+ let conversation = app
+ .conversations()
+ .create(&request.name, &created_at)
+ .await
+ .map_err(Error)?;
+
+ Ok(Response(conversation))
+}
+
+#[derive(serde::Deserialize)]
+pub struct Request {
+ pub name: Name,
+}
+
+#[derive(Debug)]
+pub struct Response(pub Conversation);
+
+impl IntoResponse for Response {
+ fn into_response(self) -> response::Response {
+ let Self(conversation) = self;
+ (StatusCode::ACCEPTED, Json(conversation)).into_response()
+ }
+}
+
+#[derive(Debug)]
+pub struct Error(pub app::CreateError);
+
+impl IntoResponse for Error {
+ fn into_response(self) -> response::Response {
+ let Self(error) = self;
+ match error {
+ app::CreateError::DuplicateName(_) => {
+ (StatusCode::CONFLICT, error.to_string()).into_response()
+ }
+ app::CreateError::InvalidName(_) => {
+ (StatusCode::BAD_REQUEST, error.to_string()).into_response()
+ }
+ app::CreateError::Name(_) | app::CreateError::Database(_) => {
+ Internal::from(error).into_response()
+ }
+ }
+ }
+}
diff --git a/src/conversation/handlers/create/test.rs b/src/conversation/handlers/create/test.rs
new file mode 100644
index 0000000..bc05b00
--- /dev/null
+++ b/src/conversation/handlers/create/test.rs
@@ -0,0 +1,250 @@
+use std::future;
+
+use axum::extract::{Json, State};
+use futures::stream::StreamExt as _;
+use itertools::Itertools;
+
+use crate::{
+ conversation::app,
+ name::Name,
+ test::fixtures::{self, future::Expect as _},
+};
+
+#[tokio::test]
+async fn new_conversation() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let creator = fixtures::identity::create(&app, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
+
+ // Call the endpoint
+
+ let name = fixtures::conversation::propose();
+ let request = super::Request { name: name.clone() };
+ let super::Response(response) =
+ super::handler(State(app.clone()), creator, fixtures::now(), Json(request))
+ .await
+ .expect("creating a conversation in an empty app succeeds");
+
+ // Verify the structure of the response
+
+ assert_eq!(name, response.name);
+
+ // Verify the semantics
+
+ let snapshot = app.boot().snapshot().await.expect("boot always succeeds");
+ let created = snapshot
+ .events
+ .into_iter()
+ .filter_map(fixtures::event::conversation)
+ .filter_map(fixtures::event::conversation::created)
+ .exactly_one()
+ .expect("only one conversation has been created");
+ assert_eq!(response, created.conversation);
+
+ let conversation = app
+ .conversations()
+ .get(&response.id)
+ .await
+ .expect("the newly-created conversation exists");
+ assert_eq!(response, conversation);
+
+ let mut events = app
+ .events()
+ .subscribe(resume_point)
+ .await
+ .expect("subscribing never fails")
+ .filter_map(fixtures::event::stream::conversation)
+ .filter_map(fixtures::event::stream::conversation::created)
+ .filter(|event| future::ready(event.conversation == response));
+
+ let event = events.next().expect_some("creation event published").await;
+
+ assert_eq!(event.conversation, response);
+}
+
+#[tokio::test]
+async fn duplicate_name() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let creator = fixtures::identity::create(&app, &fixtures::now()).await;
+ let conversation = fixtures::conversation::create(&app, &fixtures::now()).await;
+
+ // Call the endpoint
+
+ let request = super::Request {
+ name: conversation.name.clone(),
+ };
+ let super::Error(error) =
+ super::handler(State(app.clone()), creator, fixtures::now(), Json(request))
+ .await
+ .expect_err("duplicate conversation name should fail the request");
+
+ // Verify the structure of the response
+
+ assert!(matches!(
+ error,
+ app::CreateError::DuplicateName(name) if conversation.name == name
+ ));
+}
+
+#[tokio::test]
+async fn conflicting_canonical_name() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let creator = fixtures::identity::create(&app, &fixtures::now()).await;
+
+ let existing_name = Name::from("rijksmuseum");
+ app.conversations()
+ .create(&existing_name, &fixtures::now())
+ .await
+ .expect("creating a conversation in an empty environment succeeds");
+
+ let conflicting_name = Name::from("r\u{0133}ksmuseum");
+
+ // Call the endpoint
+
+ let request = super::Request {
+ name: conflicting_name.clone(),
+ };
+ let super::Error(error) =
+ super::handler(State(app.clone()), creator, fixtures::now(), Json(request))
+ .await
+ .expect_err("duplicate conversation name should fail the request");
+
+ // Verify the structure of the response
+
+ assert!(matches!(
+ error,
+ app::CreateError::DuplicateName(name) if conflicting_name == name
+ ));
+}
+
+#[tokio::test]
+async fn invalid_name() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let creator = fixtures::identity::create(&app, &fixtures::now()).await;
+
+ // Call the endpoint
+
+ let name = fixtures::conversation::propose_invalid_name();
+ let request = super::Request { name: name.clone() };
+ let super::Error(error) = crate::conversation::handlers::create::handler(
+ State(app.clone()),
+ creator,
+ fixtures::now(),
+ Json(request),
+ )
+ .await
+ .expect_err("invalid conversation name should fail the request");
+
+ // Verify the structure of the response
+
+ assert!(matches!(
+ error,
+ app::CreateError::InvalidName(error_name) if name == error_name
+ ));
+}
+
+#[tokio::test]
+async fn name_reusable_after_delete() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let creator = fixtures::identity::create(&app, &fixtures::now()).await;
+ let name = fixtures::conversation::propose();
+
+ // Call the endpoint (first time)
+
+ let request = super::Request { name: name.clone() };
+ let super::Response(response) = super::handler(
+ State(app.clone()),
+ creator.clone(),
+ fixtures::now(),
+ Json(request),
+ )
+ .await
+ .expect("new conversation in an empty app");
+
+ // Delete the conversation
+
+ app.conversations()
+ .delete(&response.id, &fixtures::now())
+ .await
+ .expect("deleting a newly-created conversation succeeds");
+
+ // Call the endpoint (second time)
+
+ let request = super::Request { name: name.clone() };
+ let super::Response(response) =
+ super::handler(State(app.clone()), creator, fixtures::now(), Json(request))
+ .await
+ .expect("creation succeeds after original conversation deleted");
+
+ // Verify the structure of the response
+
+ assert_eq!(name, response.name);
+
+ // Verify the semantics
+
+ let conversation = app
+ .conversations()
+ .get(&response.id)
+ .await
+ .expect("the newly-created conversation exists");
+ assert_eq!(response, conversation);
+}
+
+#[tokio::test]
+async fn name_reusable_after_expiry() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let creator = fixtures::identity::create(&app, &fixtures::ancient()).await;
+ let name = fixtures::conversation::propose();
+
+ // Call the endpoint (first time)
+
+ let request = super::Request { name: name.clone() };
+ let super::Response(_) = super::handler(
+ State(app.clone()),
+ creator.clone(),
+ fixtures::ancient(),
+ Json(request),
+ )
+ .await
+ .expect("new conversation in an empty app");
+
+ // Expire the conversation
+
+ app.conversations()
+ .expire(&fixtures::now())
+ .await
+ .expect("expiry always succeeds");
+
+ // Call the endpoint (second time)
+
+ let request = super::Request { name: name.clone() };
+ let super::Response(response) =
+ super::handler(State(app.clone()), creator, fixtures::now(), Json(request))
+ .await
+ .expect("creation succeeds after original conversation expired");
+
+ // Verify the structure of the response
+
+ assert_eq!(name, response.name);
+
+ // Verify the semantics
+
+ let conversation = app
+ .conversations()
+ .get(&response.id)
+ .await
+ .expect("the newly-created conversation exists");
+ assert_eq!(response, conversation);
+}
diff --git a/src/conversation/handlers/delete/mod.rs b/src/conversation/handlers/delete/mod.rs
new file mode 100644
index 0000000..272165a
--- /dev/null
+++ b/src/conversation/handlers/delete/mod.rs
@@ -0,0 +1,61 @@
+use axum::{
+ extract::{Json, Path, State},
+ http::StatusCode,
+ response::{self, IntoResponse},
+};
+
+use crate::{
+ app::App,
+ clock::RequestedAt,
+ conversation::{self, app, handlers::PathInfo},
+ error::{Internal, NotFound},
+ token::extract::Identity,
+};
+
+#[cfg(test)]
+mod test;
+
+pub async fn handler(
+ State(app): State<App>,
+ Path(conversation): Path<PathInfo>,
+ RequestedAt(deleted_at): RequestedAt,
+ _: Identity,
+) -> Result<Response, Error> {
+ app.conversations()
+ .delete(&conversation, &deleted_at)
+ .await?;
+
+ Ok(Response { id: conversation })
+}
+
+#[derive(Debug, serde::Serialize)]
+pub struct Response {
+ pub id: conversation::Id,
+}
+
+impl IntoResponse for Response {
+ fn into_response(self) -> response::Response {
+ (StatusCode::ACCEPTED, Json(self)).into_response()
+ }
+}
+
+#[derive(Debug, thiserror::Error)]
+#[error(transparent)]
+pub struct Error(#[from] pub app::DeleteError);
+
+impl IntoResponse for Error {
+ fn into_response(self) -> response::Response {
+ let Self(error) = self;
+ match error {
+ app::DeleteError::NotFound(_) | app::DeleteError::Deleted(_) => {
+ NotFound(error).into_response()
+ }
+ app::DeleteError::NotEmpty(_) => {
+ (StatusCode::CONFLICT, error.to_string()).into_response()
+ }
+ app::DeleteError::Name(_) | app::DeleteError::Database(_) => {
+ Internal::from(error).into_response()
+ }
+ }
+ }
+}
diff --git a/src/conversation/handlers/delete/test.rs b/src/conversation/handlers/delete/test.rs
new file mode 100644
index 0000000..2718d3b
--- /dev/null
+++ b/src/conversation/handlers/delete/test.rs
@@ -0,0 +1,184 @@
+use axum::extract::{Path, State};
+use itertools::Itertools;
+
+use crate::{conversation::app, test::fixtures};
+
+#[tokio::test]
+pub async fn valid_conversation() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let conversation = fixtures::conversation::create(&app, &fixtures::now()).await;
+
+ // Send the request
+
+ let deleter = fixtures::identity::create(&app, &fixtures::now()).await;
+ let response = super::handler(
+ State(app.clone()),
+ Path(conversation.id.clone()),
+ fixtures::now(),
+ deleter,
+ )
+ .await
+ .expect("deleting a valid conversation succeeds");
+
+ // Verify the response
+
+ assert_eq!(conversation.id, response.id);
+
+ // Verify the semantics
+
+ let snapshot = app.boot().snapshot().await.expect("boot always succeeds");
+ let created = snapshot
+ .events
+ .into_iter()
+ .filter_map(fixtures::event::conversation)
+ .filter_map(fixtures::event::conversation::created)
+ .exactly_one()
+ .expect("only one conversation has been created");
+ // We don't expect `conversation` to match the event exactly, as the name will have
+ // been tombstoned and the conversation given a `deleted_at` date.
+ assert_eq!(conversation.id, created.conversation.id);
+}
+
+#[tokio::test]
+pub async fn invalid_conversation_id() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+
+ // Send the request
+
+ let deleter = fixtures::identity::create(&app, &fixtures::now()).await;
+ let conversation = fixtures::conversation::fictitious();
+ let super::Error(error) = super::handler(
+ State(app.clone()),
+ Path(conversation.clone()),
+ fixtures::now(),
+ deleter,
+ )
+ .await
+ .expect_err("deleting a nonexistent conversation fails");
+
+ // Verify the response
+
+ assert!(matches!(error, app::DeleteError::NotFound(id) if id == conversation));
+}
+
+#[tokio::test]
+pub async fn conversation_deleted() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let conversation = fixtures::conversation::create(&app, &fixtures::now()).await;
+
+ app.conversations()
+ .delete(&conversation.id, &fixtures::now())
+ .await
+ .expect("deleting a recently-created conversation succeeds");
+
+ // Send the request
+
+ let deleter = fixtures::identity::create(&app, &fixtures::now()).await;
+ let super::Error(error) = super::handler(
+ State(app.clone()),
+ Path(conversation.id.clone()),
+ fixtures::now(),
+ deleter,
+ )
+ .await
+ .expect_err("deleting a deleted conversation fails");
+
+ // Verify the response
+
+ assert!(matches!(error, app::DeleteError::Deleted(id) if id == conversation.id));
+}
+
+#[tokio::test]
+pub async fn conversation_expired() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let conversation = fixtures::conversation::create(&app, &fixtures::ancient()).await;
+
+ app.conversations()
+ .expire(&fixtures::now())
+ .await
+ .expect("expiring conversations always succeeds");
+
+ // Send the request
+
+ let deleter = fixtures::identity::create(&app, &fixtures::now()).await;
+ let super::Error(error) = super::handler(
+ State(app.clone()),
+ Path(conversation.id.clone()),
+ fixtures::now(),
+ deleter,
+ )
+ .await
+ .expect_err("deleting an expired conversation fails");
+
+ // Verify the response
+
+ assert!(matches!(error, app::DeleteError::Deleted(id) if id == conversation.id));
+}
+
+#[tokio::test]
+pub async fn conversation_purged() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let conversation = fixtures::conversation::create(&app, &fixtures::ancient()).await;
+
+ app.conversations()
+ .expire(&fixtures::old())
+ .await
+ .expect("expiring conversations always succeeds");
+
+ app.conversations()
+ .purge(&fixtures::now())
+ .await
+ .expect("purging conversations always succeeds");
+
+ // Send the request
+
+ let deleter = fixtures::identity::create(&app, &fixtures::now()).await;
+ let super::Error(error) = super::handler(
+ State(app.clone()),
+ Path(conversation.id.clone()),
+ fixtures::now(),
+ deleter,
+ )
+ .await
+ .expect_err("deleting a purged conversation fails");
+
+ // Verify the response
+
+ assert!(matches!(error, app::DeleteError::NotFound(id) if id == conversation.id));
+}
+
+#[tokio::test]
+pub async fn conversation_not_empty() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let conversation = fixtures::conversation::create(&app, &fixtures::now()).await;
+ let sender = fixtures::user::create(&app, &fixtures::now()).await;
+ fixtures::message::send(&app, &conversation, &sender, &fixtures::now()).await;
+
+ // Send the request
+
+ let deleter = fixtures::identity::create(&app, &fixtures::now()).await;
+ let super::Error(error) = super::handler(
+ State(app.clone()),
+ Path(conversation.id.clone()),
+ fixtures::now(),
+ deleter,
+ )
+ .await
+ .expect_err("deleting a conversation with messages fails");
+
+ // Verify the response
+
+ assert!(matches!(error, app::DeleteError::NotEmpty(id) if id == conversation.id));
+}
diff --git a/src/conversation/handlers/mod.rs b/src/conversation/handlers/mod.rs
new file mode 100644
index 0000000..2fe727c
--- /dev/null
+++ b/src/conversation/handlers/mod.rs
@@ -0,0 +1,9 @@
+mod create;
+mod delete;
+mod send;
+
+pub use create::handler as create;
+pub use delete::handler as delete;
+pub use send::handler as send;
+
+type PathInfo = crate::conversation::Id;
diff --git a/src/conversation/handlers/send/mod.rs b/src/conversation/handlers/send/mod.rs
new file mode 100644
index 0000000..9ec020a
--- /dev/null
+++ b/src/conversation/handlers/send/mod.rs
@@ -0,0 +1,63 @@
+use axum::{
+ extract::{Json, Path, State},
+ http::StatusCode,
+ response::{self, IntoResponse},
+};
+
+use crate::conversation::handlers::PathInfo;
+use crate::{
+ app::App,
+ clock::RequestedAt,
+ error::{Internal, NotFound},
+ message::{Body, Message, app::SendError},
+ token::extract::Identity,
+};
+
+#[cfg(test)]
+mod test;
+
+pub async fn handler(
+ State(app): State<App>,
+ Path(conversation): Path<PathInfo>,
+ RequestedAt(sent_at): RequestedAt,
+ identity: Identity,
+ Json(request): Json<Request>,
+) -> Result<Response, Error> {
+ let message = app
+ .messages()
+ .send(&conversation, &identity.user, &sent_at, &request.body)
+ .await?;
+
+ Ok(Response(message))
+}
+
+#[derive(serde::Deserialize)]
+pub struct Request {
+ pub body: Body,
+}
+
+#[derive(Debug)]
+pub struct Response(pub Message);
+
+impl IntoResponse for Response {
+ fn into_response(self) -> response::Response {
+ let Self(message) = self;
+ (StatusCode::ACCEPTED, Json(message)).into_response()
+ }
+}
+
+#[derive(Debug, thiserror::Error)]
+#[error(transparent)]
+pub struct Error(#[from] pub SendError);
+
+impl IntoResponse for Error {
+ fn into_response(self) -> response::Response {
+ let Self(error) = self;
+ match error {
+ SendError::ConversationNotFound(_) | SendError::ConversationDeleted(_) => {
+ NotFound(error).into_response()
+ }
+ SendError::Name(_) | SendError::Database(_) => Internal::from(error).into_response(),
+ }
+ }
+}
diff --git a/src/conversation/handlers/send/test.rs b/src/conversation/handlers/send/test.rs
new file mode 100644
index 0000000..bd32510
--- /dev/null
+++ b/src/conversation/handlers/send/test.rs
@@ -0,0 +1,130 @@
+use axum::extract::{Json, Path, State};
+use futures::stream::{self, StreamExt as _};
+
+use crate::{
+ conversation,
+ event::Sequenced,
+ message::app::SendError,
+ test::fixtures::{self, future::Expect as _},
+};
+
+#[tokio::test]
+async fn messages_in_order() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let sender = fixtures::identity::create(&app, &fixtures::now()).await;
+ let conversation = fixtures::conversation::create(&app, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
+
+ // Call the endpoint (twice)
+
+ let requests = vec![
+ (fixtures::now(), fixtures::message::propose()),
+ (fixtures::now(), fixtures::message::propose()),
+ ];
+
+ for (sent_at, body) in &requests {
+ let request = super::Request { body: body.clone() };
+
+ let _ = super::handler(
+ State(app.clone()),
+ Path(conversation.id.clone()),
+ sent_at.clone(),
+ sender.clone(),
+ Json(request),
+ )
+ .await
+ .expect("sending to a valid conversation succeeds");
+ }
+
+ // Verify the semantics
+
+ let mut events = app
+ .events()
+ .subscribe(resume_point)
+ .await
+ .expect("subscribing always succeeds")
+ .filter_map(fixtures::event::stream::message)
+ .filter_map(fixtures::event::stream::message::sent)
+ .zip(stream::iter(requests));
+
+ while let Some((event, (sent_at, body))) = events
+ .next()
+ .expect_ready("an event should be ready for each message")
+ .await
+ {
+ assert_eq!(*sent_at, event.at());
+ assert_eq!(sender.user.id, event.message.sender);
+ assert_eq!(body, event.message.body);
+ }
+}
+
+#[tokio::test]
+async fn nonexistent_conversation() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let sender = fixtures::identity::create(&app, &fixtures::now()).await;
+
+ // Call the endpoint
+
+ let sent_at = fixtures::now();
+ let conversation = conversation::Id::generate();
+ let request = super::Request {
+ body: fixtures::message::propose(),
+ };
+ let super::Error(error) = super::handler(
+ State(app),
+ Path(conversation.clone()),
+ sent_at,
+ sender,
+ Json(request),
+ )
+ .await
+ .expect_err("sending to a nonexistent conversation fails");
+
+ // Verify the structure of the response
+
+ assert!(matches!(
+ error,
+ SendError::ConversationNotFound(error_conversation) if conversation == error_conversation
+ ));
+}
+
+#[tokio::test]
+async fn deleted_conversation() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let sender = fixtures::identity::create(&app, &fixtures::now()).await;
+ let conversation = fixtures::conversation::create(&app, &fixtures::now()).await;
+
+ app.conversations()
+ .delete(&conversation.id, &fixtures::now())
+ .await
+ .expect("deleting a new conversation succeeds");
+
+ // Call the endpoint
+
+ let sent_at = fixtures::now();
+ let request = super::Request {
+ body: fixtures::message::propose(),
+ };
+ let super::Error(error) = super::handler(
+ State(app),
+ Path(conversation.id.clone()),
+ sent_at,
+ sender,
+ Json(request),
+ )
+ .await
+ .expect_err("sending to a deleted conversation fails");
+
+ // Verify the structure of the response
+
+ assert!(matches!(
+ error,
+ SendError::ConversationDeleted(error_conversation) if conversation.id == error_conversation
+ ));
+}
diff --git a/src/conversation/history.rs b/src/conversation/history.rs
new file mode 100644
index 0000000..601614c
--- /dev/null
+++ b/src/conversation/history.rs
@@ -0,0 +1,69 @@
+use itertools::Itertools as _;
+
+use super::{
+ Conversation, Id,
+ event::{Created, Deleted, Event},
+};
+use crate::event::{Instant, Sequence};
+
+#[derive(Clone, Debug, Eq, PartialEq)]
+pub struct History {
+ pub conversation: Conversation,
+ pub deleted: Option<Instant>,
+}
+
+// State interface
+impl History {
+ pub fn id(&self) -> &Id {
+ &self.conversation.id
+ }
+
+ // Snapshot of this conversation as it was when created. (Note to the future:
+ // it's okay if this returns a redacted or modified version of the conversation.
+ // If we implement renames by redacting the original name, then this should
+ // return the renamed conversation, not the original, even if that's not how
+ // it was "as created.")
+ pub fn as_created(&self) -> Conversation {
+ self.conversation.clone()
+ }
+
+ pub fn as_of<S>(&self, sequence: S) -> Option<Conversation>
+ where
+ S: Into<Sequence>,
+ {
+ self.events()
+ .filter(Sequence::up_to(sequence.into()))
+ .collect()
+ }
+
+ // Snapshot of this conversation as of all events recorded in this history.
+ pub fn as_snapshot(&self) -> Option<Conversation> {
+ self.events().collect()
+ }
+}
+
+// Event factories
+impl History {
+ pub fn events(&self) -> impl Iterator<Item = Event> + use<> {
+ [self.created()]
+ .into_iter()
+ .merge_by(self.deleted(), Sequence::merge)
+ }
+
+ fn created(&self) -> Event {
+ Created {
+ conversation: self.conversation.clone(),
+ }
+ .into()
+ }
+
+ fn deleted(&self) -> Option<Event> {
+ self.deleted.map(|instant| {
+ Deleted {
+ instant,
+ id: self.conversation.id.clone(),
+ }
+ .into()
+ })
+ }
+}
diff --git a/src/conversation/id.rs b/src/conversation/id.rs
new file mode 100644
index 0000000..5f37a59
--- /dev/null
+++ b/src/conversation/id.rs
@@ -0,0 +1,38 @@
+use std::fmt;
+
+use crate::id::Id as BaseId;
+
+// Stable identifier for a [Conversation]. Prefixed with `C`.
+#[derive(
+ Clone,
+ Debug,
+ Eq,
+ Hash,
+ Ord,
+ PartialEq,
+ PartialOrd,
+ sqlx::Type,
+ serde::Deserialize,
+ serde::Serialize,
+)]
+#[sqlx(transparent)]
+#[serde(transparent)]
+pub struct Id(BaseId);
+
+impl From<BaseId> for Id {
+ fn from(id: BaseId) -> Self {
+ Self(id)
+ }
+}
+
+impl Id {
+ pub fn generate() -> Self {
+ BaseId::generate("C")
+ }
+}
+
+impl fmt::Display for Id {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ self.0.fmt(f)
+ }
+}
diff --git a/src/conversation/mod.rs b/src/conversation/mod.rs
new file mode 100644
index 0000000..3dfa187
--- /dev/null
+++ b/src/conversation/mod.rs
@@ -0,0 +1,10 @@
+pub mod app;
+pub mod event;
+pub mod handlers;
+mod history;
+mod id;
+pub mod repo;
+mod snapshot;
+mod validate;
+
+pub use self::{event::Event, history::History, id::Id, snapshot::Conversation};
diff --git a/src/conversation/repo.rs b/src/conversation/repo.rs
new file mode 100644
index 0000000..82b5f01
--- /dev/null
+++ b/src/conversation/repo.rs
@@ -0,0 +1,332 @@
+use futures::stream::{StreamExt as _, TryStreamExt as _};
+use sqlx::{SqliteConnection, Transaction, sqlite::Sqlite};
+
+use crate::{
+ clock::DateTime,
+ conversation::{Conversation, History, Id},
+ db::NotFound,
+ event::{Instant, Sequence},
+ name::{self, Name},
+};
+
+pub trait Provider {
+ fn conversations(&mut self) -> Conversations;
+}
+
+impl Provider for Transaction<'_, Sqlite> {
+ fn conversations(&mut self) -> Conversations {
+ Conversations(self)
+ }
+}
+
+pub struct Conversations<'t>(&'t mut SqliteConnection);
+
+impl Conversations<'_> {
+ pub async fn create(&mut self, name: &Name, created: &Instant) -> Result<History, sqlx::Error> {
+ let id = Id::generate();
+ let name = name.clone();
+ let display_name = name.display();
+ let canonical_name = name.canonical();
+ let created = *created;
+
+ sqlx::query!(
+ r#"
+ insert into conversation (id, created_at, created_sequence, last_sequence)
+ values ($1, $2, $3, $4)
+ "#,
+ id,
+ created.at,
+ created.sequence,
+ created.sequence,
+ )
+ .execute(&mut *self.0)
+ .await?;
+
+ sqlx::query!(
+ r#"
+ insert into conversation_name (id, display_name, canonical_name)
+ values ($1, $2, $3)
+ "#,
+ id,
+ display_name,
+ canonical_name,
+ )
+ .execute(&mut *self.0)
+ .await?;
+
+ let conversation = History {
+ conversation: Conversation {
+ created,
+ id,
+ name: name.clone(),
+ deleted_at: None,
+ },
+ deleted: None,
+ };
+
+ Ok(conversation)
+ }
+
+ pub async fn by_id(&mut self, conversation: &Id) -> Result<History, LoadError> {
+ let conversation = sqlx::query!(
+ r#"
+ select
+ id as "id: Id",
+ name.display_name as "display_name?: String",
+ name.canonical_name as "canonical_name?: String",
+ conversation.created_at as "created_at: DateTime",
+ conversation.created_sequence as "created_sequence: Sequence",
+ deleted.deleted_at as "deleted_at?: DateTime",
+ deleted.deleted_sequence as "deleted_sequence?: Sequence"
+ from conversation
+ left join conversation_name as name
+ using (id)
+ left join conversation_deleted as deleted
+ using (id)
+ where id = $1
+ "#,
+ conversation,
+ )
+ .map(|row| {
+ Ok::<_, name::Error>(History {
+ conversation: Conversation {
+ created: Instant::new(row.created_at, row.created_sequence),
+ id: row.id,
+ name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(),
+ deleted_at: row.deleted_at,
+ },
+ deleted: Instant::optional(row.deleted_at, row.deleted_sequence),
+ })
+ })
+ .fetch_one(&mut *self.0)
+ .await??;
+
+ Ok(conversation)
+ }
+
+ pub async fn all(&mut self, resume_at: Sequence) -> Result<Vec<History>, LoadError> {
+ let conversations = sqlx::query!(
+ r#"
+ select
+ id as "id: Id",
+ name.display_name as "display_name?: String",
+ name.canonical_name as "canonical_name?: String",
+ conversation.created_at as "created_at: DateTime",
+ conversation.created_sequence as "created_sequence: Sequence",
+ deleted.deleted_at as "deleted_at?: DateTime",
+ deleted.deleted_sequence as "deleted_sequence?: Sequence"
+ from conversation
+ left join conversation_name as name
+ using (id)
+ left join conversation_deleted as deleted
+ using (id)
+ where conversation.created_sequence <= $1
+ order by name.canonical_name
+ "#,
+ resume_at,
+ )
+ .map(|row| {
+ Ok::<_, name::Error>(History {
+ conversation: Conversation {
+ created: Instant::new(row.created_at, row.created_sequence),
+ id: row.id,
+ name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(),
+ deleted_at: row.deleted_at,
+ },
+ deleted: Instant::optional(row.deleted_at, row.deleted_sequence),
+ })
+ })
+ .fetch(&mut *self.0)
+ .map(|res| Ok::<_, LoadError>(res??))
+ .try_collect()
+ .await?;
+
+ Ok(conversations)
+ }
+
+ pub async fn replay(&mut self, resume_at: Sequence) -> Result<Vec<History>, LoadError> {
+ let conversations = sqlx::query!(
+ r#"
+ select
+ id as "id: Id",
+ name.display_name as "display_name?: String",
+ name.canonical_name as "canonical_name?: String",
+ conversation.created_at as "created_at: DateTime",
+ conversation.created_sequence as "created_sequence: Sequence",
+ deleted.deleted_at as "deleted_at?: DateTime",
+ deleted.deleted_sequence as "deleted_sequence?: Sequence"
+ from conversation
+ left join conversation_name as name
+ using (id)
+ left join conversation_deleted as deleted
+ using (id)
+ where conversation.last_sequence > $1
+ "#,
+ resume_at,
+ )
+ .map(|row| {
+ Ok::<_, name::Error>(History {
+ conversation: Conversation {
+ created: Instant::new(row.created_at, row.created_sequence),
+ id: row.id,
+ name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(),
+ deleted_at: row.deleted_at,
+ },
+ deleted: Instant::optional(row.deleted_at, row.deleted_sequence),
+ })
+ })
+ .fetch(&mut *self.0)
+ .map(|res| Ok::<_, LoadError>(res??))
+ .try_collect()
+ .await?;
+
+ Ok(conversations)
+ }
+
+ pub async fn delete(
+ &mut self,
+ conversation: &History,
+ deleted: &Instant,
+ ) -> Result<History, LoadError> {
+ let id = conversation.id();
+ sqlx::query!(
+ r#"
+ update conversation
+ set last_sequence = max(last_sequence, $1)
+ where id = $2
+ returning id as "id: Id"
+ "#,
+ deleted.sequence,
+ id,
+ )
+ .fetch_one(&mut *self.0)
+ .await?;
+
+ sqlx::query!(
+ r#"
+ insert into conversation_deleted (id, deleted_at, deleted_sequence)
+ values ($1, $2, $3)
+ "#,
+ id,
+ deleted.at,
+ deleted.sequence,
+ )
+ .execute(&mut *self.0)
+ .await?;
+
+ // Small social responsibility hack here: when a conversation is deleted, its
+ // name is retconned to have been the empty string. Someone reading the event
+ // stream afterwards, or looking at conversations via the API, cannot retrieve
+ // the "deleted" conversation's information by ignoring the deletion event.
+ sqlx::query!(
+ r#"
+ delete from conversation_name
+ where id = $1
+ "#,
+ id,
+ )
+ .execute(&mut *self.0)
+ .await?;
+
+ let conversation = self.by_id(id).await?;
+
+ Ok(conversation)
+ }
+
+ pub async fn purge(&mut self, purge_at: &DateTime) -> Result<(), sqlx::Error> {
+ let conversations = sqlx::query_scalar!(
+ r#"
+ with has_messages as (
+ select conversation
+ from message
+ group by conversation
+ )
+ delete from conversation_deleted
+ where deleted_at < $1
+ and id not in has_messages
+ returning id as "id: Id"
+ "#,
+ purge_at,
+ )
+ .fetch_all(&mut *self.0)
+ .await?;
+
+ for conversation in conversations {
+ // Wanted: a way to batch these up into one query.
+ sqlx::query!(
+ r#"
+ delete from conversation
+ where id = $1
+ "#,
+ conversation,
+ )
+ .execute(&mut *self.0)
+ .await?;
+ }
+
+ Ok(())
+ }
+
+ pub async fn expired(&mut self, expired_at: &DateTime) -> Result<Vec<History>, LoadError> {
+ let conversations = sqlx::query!(
+ r#"
+ select
+ conversation.id as "id: Id",
+ name.display_name as "display_name?: String",
+ name.canonical_name as "canonical_name?: String",
+ conversation.created_at as "created_at: DateTime",
+ conversation.created_sequence as "created_sequence: Sequence",
+ deleted.deleted_at as "deleted_at?: DateTime",
+ deleted.deleted_sequence as "deleted_sequence?: Sequence"
+ from conversation
+ left join conversation_name as name
+ using (id)
+ left join conversation_deleted as deleted
+ using (id)
+ left join message
+ on conversation.id = message.conversation
+ where conversation.created_at < $1
+ and message.id is null
+ and deleted.id is null
+ "#,
+ expired_at,
+ )
+ .map(|row| {
+ Ok::<_, name::Error>(History {
+ conversation: Conversation {
+ created: Instant::new(row.created_at, row.created_sequence),
+ id: row.id,
+ name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(),
+ deleted_at: row.deleted_at,
+ },
+ deleted: Instant::optional(row.deleted_at, row.deleted_sequence),
+ })
+ })
+ .fetch(&mut *self.0)
+ .map(|res| Ok::<_, LoadError>(res??))
+ .try_collect()
+ .await?;
+
+ Ok(conversations)
+ }
+}
+
+#[derive(Debug, thiserror::Error)]
+#[error(transparent)]
+pub enum LoadError {
+ Database(#[from] sqlx::Error),
+ Name(#[from] name::Error),
+}
+
+impl<T> NotFound for Result<T, LoadError> {
+ type Ok = T;
+ type Error = LoadError;
+
+ fn optional(self) -> Result<Option<T>, LoadError> {
+ match self {
+ Ok(value) => Ok(Some(value)),
+ Err(LoadError::Database(sqlx::Error::RowNotFound)) => Ok(None),
+ Err(other) => Err(other),
+ }
+ }
+}
diff --git a/src/conversation/snapshot.rs b/src/conversation/snapshot.rs
new file mode 100644
index 0000000..da9eaae
--- /dev/null
+++ b/src/conversation/snapshot.rs
@@ -0,0 +1,43 @@
+use super::{
+ Id,
+ event::{Created, Event},
+};
+use crate::{clock::DateTime, event::Instant, name::Name};
+
+#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
+pub struct Conversation {
+ #[serde(flatten)]
+ pub created: Instant,
+ pub id: Id,
+ pub name: Name,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub deleted_at: Option<DateTime>,
+}
+
+impl Conversation {
+ fn apply(state: Option<Self>, event: Event) -> Option<Self> {
+ match (state, event) {
+ (None, Event::Created(event)) => Some(event.into()),
+ (Some(conversation), Event::Deleted(event)) if conversation.id == event.id => None,
+ (state, event) => panic!("invalid conversation event {event:#?} for state {state:#?}"),
+ }
+ }
+}
+
+impl FromIterator<Event> for Option<Conversation> {
+ fn from_iter<I: IntoIterator<Item = Event>>(events: I) -> Self {
+ events.into_iter().fold(None, Conversation::apply)
+ }
+}
+
+impl From<&Created> for Conversation {
+ fn from(event: &Created) -> Self {
+ event.conversation.clone()
+ }
+}
+
+impl From<Created> for Conversation {
+ fn from(event: Created) -> Self {
+ event.conversation
+ }
+}
diff --git a/src/conversation/validate.rs b/src/conversation/validate.rs
new file mode 100644
index 0000000..7894e0c
--- /dev/null
+++ b/src/conversation/validate.rs
@@ -0,0 +1,25 @@
+use std::ops::Not as _;
+
+use unicode_segmentation::UnicodeSegmentation as _;
+
+use crate::name::Name;
+
+// Picked out of a hat. The power of two is not meaningful.
+const NAME_TOO_LONG: usize = 64;
+
+pub fn name(name: &Name) -> bool {
+ let display = name.display();
+
+ [
+ display.graphemes(true).count() < NAME_TOO_LONG,
+ display.chars().any(char::is_control).not(),
+ display.chars().next().is_some_and(|c| !c.is_whitespace()),
+ display.chars().last().is_some_and(|c| !c.is_whitespace()),
+ display
+ .chars()
+ .zip(display.chars().skip(1))
+ .all(|(a, b)| !(a.is_whitespace() && b.is_whitespace())),
+ ]
+ .into_iter()
+ .all(|value| value)
+}