summaryrefslogtreecommitdiff
path: root/src/conversation/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'src/conversation/handlers')
-rw-r--r--src/conversation/handlers/create/mod.rs67
-rw-r--r--src/conversation/handlers/create/test.rs250
-rw-r--r--src/conversation/handlers/delete/mod.rs61
-rw-r--r--src/conversation/handlers/delete/test.rs184
-rw-r--r--src/conversation/handlers/mod.rs9
-rw-r--r--src/conversation/handlers/send/mod.rs63
-rw-r--r--src/conversation/handlers/send/test.rs130
7 files changed, 764 insertions, 0 deletions
diff --git a/src/conversation/handlers/create/mod.rs b/src/conversation/handlers/create/mod.rs
new file mode 100644
index 0000000..18eca1f
--- /dev/null
+++ b/src/conversation/handlers/create/mod.rs
@@ -0,0 +1,67 @@
+use axum::{
+ extract::{Json, State},
+ http::StatusCode,
+ response::{self, IntoResponse},
+};
+
+use crate::{
+ app::App,
+ clock::RequestedAt,
+ conversation::{Conversation, app},
+ error::Internal,
+ name::Name,
+ token::extract::Identity,
+};
+
+#[cfg(test)]
+mod test;
+
+pub async fn handler(
+ State(app): State<App>,
+ _: Identity, // requires auth, but doesn't actually care who you are
+ RequestedAt(created_at): RequestedAt,
+ Json(request): Json<Request>,
+) -> Result<Response, Error> {
+ let conversation = app
+ .conversations()
+ .create(&request.name, &created_at)
+ .await
+ .map_err(Error)?;
+
+ Ok(Response(conversation))
+}
+
+#[derive(serde::Deserialize)]
+pub struct Request {
+ pub name: Name,
+}
+
+#[derive(Debug)]
+pub struct Response(pub Conversation);
+
+impl IntoResponse for Response {
+ fn into_response(self) -> response::Response {
+ let Self(conversation) = self;
+ (StatusCode::ACCEPTED, Json(conversation)).into_response()
+ }
+}
+
+#[derive(Debug)]
+pub struct Error(pub app::CreateError);
+
+impl IntoResponse for Error {
+ fn into_response(self) -> response::Response {
+ let Self(error) = self;
+ match error {
+ app::CreateError::DuplicateName(_) => {
+ (StatusCode::CONFLICT, error.to_string()).into_response()
+ }
+ app::CreateError::InvalidName(_) => {
+ (StatusCode::BAD_REQUEST, error.to_string()).into_response()
+ }
+ app::CreateError::Name(_) | app::CreateError::Database(_) => {
+ Internal::from(error).into_response()
+ }
+ }
+ }
+}
diff --git a/src/conversation/handlers/create/test.rs b/src/conversation/handlers/create/test.rs
new file mode 100644
index 0000000..bc05b00
--- /dev/null
+++ b/src/conversation/handlers/create/test.rs
@@ -0,0 +1,250 @@
+use std::future;
+
+use axum::extract::{Json, State};
+use futures::stream::StreamExt as _;
+use itertools::Itertools;
+
+use crate::{
+ conversation::app,
+ name::Name,
+ test::fixtures::{self, future::Expect as _},
+};
+
+#[tokio::test]
+async fn new_conversation() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let creator = fixtures::identity::create(&app, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
+
+ // Call the endpoint
+
+ let name = fixtures::conversation::propose();
+ let request = super::Request { name: name.clone() };
+ let super::Response(response) =
+ super::handler(State(app.clone()), creator, fixtures::now(), Json(request))
+ .await
+ .expect("creating a conversation in an empty app succeeds");
+
+ // Verify the structure of the response
+
+ assert_eq!(name, response.name);
+
+ // Verify the semantics
+
+ let snapshot = app.boot().snapshot().await.expect("boot always succeeds");
+ let created = snapshot
+ .events
+ .into_iter()
+ .filter_map(fixtures::event::conversation)
+ .filter_map(fixtures::event::conversation::created)
+ .exactly_one()
+ .expect("only one conversation has been created");
+ assert_eq!(response, created.conversation);
+
+ let conversation = app
+ .conversations()
+ .get(&response.id)
+ .await
+ .expect("the newly-created conversation exists");
+ assert_eq!(response, conversation);
+
+ let mut events = app
+ .events()
+ .subscribe(resume_point)
+ .await
+ .expect("subscribing never fails")
+ .filter_map(fixtures::event::stream::conversation)
+ .filter_map(fixtures::event::stream::conversation::created)
+ .filter(|event| future::ready(event.conversation == response));
+
+ let event = events.next().expect_some("creation event published").await;
+
+ assert_eq!(event.conversation, response);
+}
+
+#[tokio::test]
+async fn duplicate_name() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let creator = fixtures::identity::create(&app, &fixtures::now()).await;
+ let conversation = fixtures::conversation::create(&app, &fixtures::now()).await;
+
+ // Call the endpoint
+
+ let request = super::Request {
+ name: conversation.name.clone(),
+ };
+ let super::Error(error) =
+ super::handler(State(app.clone()), creator, fixtures::now(), Json(request))
+ .await
+ .expect_err("duplicate conversation name should fail the request");
+
+ // Verify the structure of the response
+
+ assert!(matches!(
+ error,
+ app::CreateError::DuplicateName(name) if conversation.name == name
+ ));
+}
+
+#[tokio::test]
+async fn conflicting_canonical_name() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let creator = fixtures::identity::create(&app, &fixtures::now()).await;
+
+ let existing_name = Name::from("rijksmuseum");
+ app.conversations()
+ .create(&existing_name, &fixtures::now())
+ .await
+ .expect("creating a conversation in an empty environment succeeds");
+
+ let conflicting_name = Name::from("r\u{0133}ksmuseum");
+
+ // Call the endpoint
+
+ let request = super::Request {
+ name: conflicting_name.clone(),
+ };
+ let super::Error(error) =
+ super::handler(State(app.clone()), creator, fixtures::now(), Json(request))
+ .await
+ .expect_err("duplicate conversation name should fail the request");
+
+ // Verify the structure of the response
+
+ assert!(matches!(
+ error,
+ app::CreateError::DuplicateName(name) if conflicting_name == name
+ ));
+}
+
+#[tokio::test]
+async fn invalid_name() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let creator = fixtures::identity::create(&app, &fixtures::now()).await;
+
+ // Call the endpoint
+
+ let name = fixtures::conversation::propose_invalid_name();
+ let request = super::Request { name: name.clone() };
+ let super::Error(error) = crate::conversation::handlers::create::handler(
+ State(app.clone()),
+ creator,
+ fixtures::now(),
+ Json(request),
+ )
+ .await
+ .expect_err("invalid conversation name should fail the request");
+
+ // Verify the structure of the response
+
+ assert!(matches!(
+ error,
+ app::CreateError::InvalidName(error_name) if name == error_name
+ ));
+}
+
+#[tokio::test]
+async fn name_reusable_after_delete() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let creator = fixtures::identity::create(&app, &fixtures::now()).await;
+ let name = fixtures::conversation::propose();
+
+ // Call the endpoint (first time)
+
+ let request = super::Request { name: name.clone() };
+ let super::Response(response) = super::handler(
+ State(app.clone()),
+ creator.clone(),
+ fixtures::now(),
+ Json(request),
+ )
+ .await
+ .expect("new conversation in an empty app");
+
+ // Delete the conversation
+
+ app.conversations()
+ .delete(&response.id, &fixtures::now())
+ .await
+ .expect("deleting a newly-created conversation succeeds");
+
+ // Call the endpoint (second time)
+
+ let request = super::Request { name: name.clone() };
+ let super::Response(response) =
+ super::handler(State(app.clone()), creator, fixtures::now(), Json(request))
+ .await
+ .expect("creation succeeds after original conversation deleted");
+
+ // Verify the structure of the response
+
+ assert_eq!(name, response.name);
+
+ // Verify the semantics
+
+ let conversation = app
+ .conversations()
+ .get(&response.id)
+ .await
+ .expect("the newly-created conversation exists");
+ assert_eq!(response, conversation);
+}
+
+#[tokio::test]
+async fn name_reusable_after_expiry() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let creator = fixtures::identity::create(&app, &fixtures::ancient()).await;
+ let name = fixtures::conversation::propose();
+
+ // Call the endpoint (first time)
+
+ let request = super::Request { name: name.clone() };
+ let super::Response(_) = super::handler(
+ State(app.clone()),
+ creator.clone(),
+ fixtures::ancient(),
+ Json(request),
+ )
+ .await
+ .expect("new conversation in an empty app");
+
+ // Expire the conversation
+
+ app.conversations()
+ .expire(&fixtures::now())
+ .await
+ .expect("expiry always succeeds");
+
+ // Call the endpoint (second time)
+
+ let request = super::Request { name: name.clone() };
+ let super::Response(response) =
+ super::handler(State(app.clone()), creator, fixtures::now(), Json(request))
+ .await
+ .expect("creation succeeds after original conversation expired");
+
+ // Verify the structure of the response
+
+ assert_eq!(name, response.name);
+
+ // Verify the semantics
+
+ let conversation = app
+ .conversations()
+ .get(&response.id)
+ .await
+ .expect("the newly-created conversation exists");
+ assert_eq!(response, conversation);
+}
diff --git a/src/conversation/handlers/delete/mod.rs b/src/conversation/handlers/delete/mod.rs
new file mode 100644
index 0000000..272165a
--- /dev/null
+++ b/src/conversation/handlers/delete/mod.rs
@@ -0,0 +1,61 @@
+use axum::{
+ extract::{Json, Path, State},
+ http::StatusCode,
+ response::{self, IntoResponse},
+};
+
+use crate::{
+ app::App,
+ clock::RequestedAt,
+ conversation::{self, app, handlers::PathInfo},
+ error::{Internal, NotFound},
+ token::extract::Identity,
+};
+
+#[cfg(test)]
+mod test;
+
+pub async fn handler(
+ State(app): State<App>,
+ Path(conversation): Path<PathInfo>,
+ RequestedAt(deleted_at): RequestedAt,
+ _: Identity,
+) -> Result<Response, Error> {
+ app.conversations()
+ .delete(&conversation, &deleted_at)
+ .await?;
+
+ Ok(Response { id: conversation })
+}
+
+#[derive(Debug, serde::Serialize)]
+pub struct Response {
+ pub id: conversation::Id,
+}
+
+impl IntoResponse for Response {
+ fn into_response(self) -> response::Response {
+ (StatusCode::ACCEPTED, Json(self)).into_response()
+ }
+}
+
+#[derive(Debug, thiserror::Error)]
+#[error(transparent)]
+pub struct Error(#[from] pub app::DeleteError);
+
+impl IntoResponse for Error {
+ fn into_response(self) -> response::Response {
+ let Self(error) = self;
+ match error {
+ app::DeleteError::NotFound(_) | app::DeleteError::Deleted(_) => {
+ NotFound(error).into_response()
+ }
+ app::DeleteError::NotEmpty(_) => {
+ (StatusCode::CONFLICT, error.to_string()).into_response()
+ }
+ app::DeleteError::Name(_) | app::DeleteError::Database(_) => {
+ Internal::from(error).into_response()
+ }
+ }
+ }
+}
diff --git a/src/conversation/handlers/delete/test.rs b/src/conversation/handlers/delete/test.rs
new file mode 100644
index 0000000..2718d3b
--- /dev/null
+++ b/src/conversation/handlers/delete/test.rs
@@ -0,0 +1,184 @@
+use axum::extract::{Path, State};
+use itertools::Itertools;
+
+use crate::{conversation::app, test::fixtures};
+
+#[tokio::test]
+pub async fn valid_conversation() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let conversation = fixtures::conversation::create(&app, &fixtures::now()).await;
+
+ // Send the request
+
+ let deleter = fixtures::identity::create(&app, &fixtures::now()).await;
+ let response = super::handler(
+ State(app.clone()),
+ Path(conversation.id.clone()),
+ fixtures::now(),
+ deleter,
+ )
+ .await
+ .expect("deleting a valid conversation succeeds");
+
+ // Verify the response
+
+ assert_eq!(conversation.id, response.id);
+
+ // Verify the semantics
+
+ let snapshot = app.boot().snapshot().await.expect("boot always succeeds");
+ let created = snapshot
+ .events
+ .into_iter()
+ .filter_map(fixtures::event::conversation)
+ .filter_map(fixtures::event::conversation::created)
+ .exactly_one()
+ .expect("only one conversation has been created");
+ // We don't expect `conversation` to match the event exactly, as the name will have
+ // been tombstoned and the conversation given a `deleted_at` date.
+ assert_eq!(conversation.id, created.conversation.id);
+}
+
+#[tokio::test]
+pub async fn invalid_conversation_id() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+
+ // Send the request
+
+ let deleter = fixtures::identity::create(&app, &fixtures::now()).await;
+ let conversation = fixtures::conversation::fictitious();
+ let super::Error(error) = super::handler(
+ State(app.clone()),
+ Path(conversation.clone()),
+ fixtures::now(),
+ deleter,
+ )
+ .await
+ .expect_err("deleting a nonexistent conversation fails");
+
+ // Verify the response
+
+ assert!(matches!(error, app::DeleteError::NotFound(id) if id == conversation));
+}
+
+#[tokio::test]
+pub async fn conversation_deleted() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let conversation = fixtures::conversation::create(&app, &fixtures::now()).await;
+
+ app.conversations()
+ .delete(&conversation.id, &fixtures::now())
+ .await
+ .expect("deleting a recently-created conversation succeeds");
+
+ // Send the request
+
+ let deleter = fixtures::identity::create(&app, &fixtures::now()).await;
+ let super::Error(error) = super::handler(
+ State(app.clone()),
+ Path(conversation.id.clone()),
+ fixtures::now(),
+ deleter,
+ )
+ .await
+ .expect_err("deleting a deleted conversation fails");
+
+ // Verify the response
+
+ assert!(matches!(error, app::DeleteError::Deleted(id) if id == conversation.id));
+}
+
+#[tokio::test]
+pub async fn conversation_expired() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let conversation = fixtures::conversation::create(&app, &fixtures::ancient()).await;
+
+ app.conversations()
+ .expire(&fixtures::now())
+ .await
+ .expect("expiring conversations always succeeds");
+
+ // Send the request
+
+ let deleter = fixtures::identity::create(&app, &fixtures::now()).await;
+ let super::Error(error) = super::handler(
+ State(app.clone()),
+ Path(conversation.id.clone()),
+ fixtures::now(),
+ deleter,
+ )
+ .await
+ .expect_err("deleting an expired conversation fails");
+
+ // Verify the response
+
+ assert!(matches!(error, app::DeleteError::Deleted(id) if id == conversation.id));
+}
+
+#[tokio::test]
+pub async fn conversation_purged() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let conversation = fixtures::conversation::create(&app, &fixtures::ancient()).await;
+
+ app.conversations()
+ .expire(&fixtures::old())
+ .await
+ .expect("expiring conversations always succeeds");
+
+ app.conversations()
+ .purge(&fixtures::now())
+ .await
+ .expect("purging conversations always succeeds");
+
+ // Send the request
+
+ let deleter = fixtures::identity::create(&app, &fixtures::now()).await;
+ let super::Error(error) = super::handler(
+ State(app.clone()),
+ Path(conversation.id.clone()),
+ fixtures::now(),
+ deleter,
+ )
+ .await
+ .expect_err("deleting a purged conversation fails");
+
+ // Verify the response
+
+ assert!(matches!(error, app::DeleteError::NotFound(id) if id == conversation.id));
+}
+
+#[tokio::test]
+pub async fn conversation_not_empty() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let conversation = fixtures::conversation::create(&app, &fixtures::now()).await;
+ let sender = fixtures::user::create(&app, &fixtures::now()).await;
+ fixtures::message::send(&app, &conversation, &sender, &fixtures::now()).await;
+
+ // Send the request
+
+ let deleter = fixtures::identity::create(&app, &fixtures::now()).await;
+ let super::Error(error) = super::handler(
+ State(app.clone()),
+ Path(conversation.id.clone()),
+ fixtures::now(),
+ deleter,
+ )
+ .await
+ .expect_err("deleting a conversation with messages fails");
+
+ // Verify the response
+
+ assert!(matches!(error, app::DeleteError::NotEmpty(id) if id == conversation.id));
+}
diff --git a/src/conversation/handlers/mod.rs b/src/conversation/handlers/mod.rs
new file mode 100644
index 0000000..2fe727c
--- /dev/null
+++ b/src/conversation/handlers/mod.rs
@@ -0,0 +1,9 @@
+mod create;
+mod delete;
+mod send;
+
+pub use create::handler as create;
+pub use delete::handler as delete;
+pub use send::handler as send;
+
+type PathInfo = crate::conversation::Id;
diff --git a/src/conversation/handlers/send/mod.rs b/src/conversation/handlers/send/mod.rs
new file mode 100644
index 0000000..9ec020a
--- /dev/null
+++ b/src/conversation/handlers/send/mod.rs
@@ -0,0 +1,63 @@
+use axum::{
+ extract::{Json, Path, State},
+ http::StatusCode,
+ response::{self, IntoResponse},
+};
+
+use crate::conversation::handlers::PathInfo;
+use crate::{
+ app::App,
+ clock::RequestedAt,
+ error::{Internal, NotFound},
+ message::{Body, Message, app::SendError},
+ token::extract::Identity,
+};
+
+#[cfg(test)]
+mod test;
+
+pub async fn handler(
+ State(app): State<App>,
+ Path(conversation): Path<PathInfo>,
+ RequestedAt(sent_at): RequestedAt,
+ identity: Identity,
+ Json(request): Json<Request>,
+) -> Result<Response, Error> {
+ let message = app
+ .messages()
+ .send(&conversation, &identity.user, &sent_at, &request.body)
+ .await?;
+
+ Ok(Response(message))
+}
+
+#[derive(serde::Deserialize)]
+pub struct Request {
+ pub body: Body,
+}
+
+#[derive(Debug)]
+pub struct Response(pub Message);
+
+impl IntoResponse for Response {
+ fn into_response(self) -> response::Response {
+ let Self(message) = self;
+ (StatusCode::ACCEPTED, Json(message)).into_response()
+ }
+}
+
+#[derive(Debug, thiserror::Error)]
+#[error(transparent)]
+pub struct Error(#[from] pub SendError);
+
+impl IntoResponse for Error {
+ fn into_response(self) -> response::Response {
+ let Self(error) = self;
+ match error {
+ SendError::ConversationNotFound(_) | SendError::ConversationDeleted(_) => {
+ NotFound(error).into_response()
+ }
+ SendError::Name(_) | SendError::Database(_) => Internal::from(error).into_response(),
+ }
+ }
+}
diff --git a/src/conversation/handlers/send/test.rs b/src/conversation/handlers/send/test.rs
new file mode 100644
index 0000000..bd32510
--- /dev/null
+++ b/src/conversation/handlers/send/test.rs
@@ -0,0 +1,130 @@
+use axum::extract::{Json, Path, State};
+use futures::stream::{self, StreamExt as _};
+
+use crate::{
+ conversation,
+ event::Sequenced,
+ message::app::SendError,
+ test::fixtures::{self, future::Expect as _},
+};
+
+#[tokio::test]
+async fn messages_in_order() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let sender = fixtures::identity::create(&app, &fixtures::now()).await;
+ let conversation = fixtures::conversation::create(&app, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
+
+ // Call the endpoint (twice)
+
+ let requests = vec![
+ (fixtures::now(), fixtures::message::propose()),
+ (fixtures::now(), fixtures::message::propose()),
+ ];
+
+ for (sent_at, body) in &requests {
+ let request = super::Request { body: body.clone() };
+
+ let _ = super::handler(
+ State(app.clone()),
+ Path(conversation.id.clone()),
+ sent_at.clone(),
+ sender.clone(),
+ Json(request),
+ )
+ .await
+ .expect("sending to a valid conversation succeeds");
+ }
+
+ // Verify the semantics
+
+ let mut events = app
+ .events()
+ .subscribe(resume_point)
+ .await
+ .expect("subscribing always succeeds")
+ .filter_map(fixtures::event::stream::message)
+ .filter_map(fixtures::event::stream::message::sent)
+ .zip(stream::iter(requests));
+
+ while let Some((event, (sent_at, body))) = events
+ .next()
+ .expect_ready("an event should be ready for each message")
+ .await
+ {
+ assert_eq!(*sent_at, event.at());
+ assert_eq!(sender.user.id, event.message.sender);
+ assert_eq!(body, event.message.body);
+ }
+}
+
+#[tokio::test]
+async fn nonexistent_conversation() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let sender = fixtures::identity::create(&app, &fixtures::now()).await;
+
+ // Call the endpoint
+
+ let sent_at = fixtures::now();
+ let conversation = conversation::Id::generate();
+ let request = super::Request {
+ body: fixtures::message::propose(),
+ };
+ let super::Error(error) = super::handler(
+ State(app),
+ Path(conversation.clone()),
+ sent_at,
+ sender,
+ Json(request),
+ )
+ .await
+ .expect_err("sending to a nonexistent conversation fails");
+
+ // Verify the structure of the response
+
+ assert!(matches!(
+ error,
+ SendError::ConversationNotFound(error_conversation) if conversation == error_conversation
+ ));
+}
+
+#[tokio::test]
+async fn deleted_conversation() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let sender = fixtures::identity::create(&app, &fixtures::now()).await;
+ let conversation = fixtures::conversation::create(&app, &fixtures::now()).await;
+
+ app.conversations()
+ .delete(&conversation.id, &fixtures::now())
+ .await
+ .expect("deleting a new conversation succeeds");
+
+ // Call the endpoint
+
+ let sent_at = fixtures::now();
+ let request = super::Request {
+ body: fixtures::message::propose(),
+ };
+ let super::Error(error) = super::handler(
+ State(app),
+ Path(conversation.id.clone()),
+ sent_at,
+ sender,
+ Json(request),
+ )
+ .await
+ .expect_err("sending to a deleted conversation fails");
+
+ // Verify the structure of the response
+
+ assert!(matches!(
+ error,
+ SendError::ConversationDeleted(error_conversation) if conversation.id == error_conversation
+ ));
+}