diff options
Diffstat (limited to 'src/conversation/app.rs')
| -rw-r--r-- | src/conversation/app.rs | 155 |
1 files changed, 71 insertions, 84 deletions
diff --git a/src/conversation/app.rs b/src/conversation/app.rs index 2b62e77..73c655b 100644 --- a/src/conversation/app.rs +++ b/src/conversation/app.rs @@ -2,17 +2,14 @@ use chrono::TimeDelta; use itertools::Itertools; use sqlx::sqlite::SqlitePool; -use super::{ - Conversation, History, Id, history, - repo::{LoadError, Provider as _}, - validate, -}; +use super::{Conversation, History, Id, history, repo::Provider as _, validate}; use crate::{ clock::DateTime, - db::{Duplicate as _, NotFound as _}, + db::{self, NotFound as _}, + error::failed::{ErrorExt as _, Failed, ResultExt as _}, event::{Broadcaster, Sequence, repo::Provider as _}, message::repo::Provider as _, - name::{self, Name}, + name::Name, }; pub struct Conversations { @@ -34,8 +31,12 @@ impl Conversations { return Err(CreateError::InvalidName(name.clone())); } - let mut tx = self.db.begin().await?; - let created = tx.sequence().next(created_at).await?; + let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?; + let created = tx + .sequence() + .next(created_at) + .await + .fail("Failed to find event sequence number")?; let conversation = History::begin(name, created); // This filter technically includes every event in the history, but it's easier to follow if @@ -45,9 +46,12 @@ impl Conversations { tx.conversations() .record_events(events.clone()) .await - .duplicate(|| CreateError::DuplicateName(name.clone()))?; + .map_err(|err| match err.as_database_error() { + Some(err) if err.is_unique_violation() => CreateError::DuplicateName(name.clone()), + _ => err.fail("Failed to store events"), + })?; - tx.commit().await?; + tx.commit().await.fail(db::failed::COMMIT)?; self.events.broadcast_from(events); @@ -56,17 +60,21 @@ impl Conversations { // This function is careless with respect to time, and gets you the // conversation as it exists in the specific moment when you call it. - pub async fn get(&self, conversation: &Id) -> Result<Conversation, Error> { - let to_not_found = || Error::NotFound(conversation.clone()); - let to_deleted = || Error::Deleted(conversation.clone()); + pub async fn get(&self, conversation: &Id) -> Result<Conversation, GetError> { + let to_not_found = || GetError::NotFound(conversation.clone()); + let to_deleted = || GetError::Deleted(conversation.clone()); + + let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?; - let mut tx = self.db.begin().await?; let conversation = tx .conversations() .by_id(conversation) .await - .not_found(to_not_found)?; - tx.commit().await?; + .optional() + .fail("Failed to load conversation")? + .ok_or_else(to_not_found)?; + + tx.commit().await.fail(db::failed::COMMIT)?; conversation.as_snapshot().ok_or_else(to_deleted) } @@ -76,16 +84,28 @@ impl Conversations { conversation: &Id, deleted_at: &DateTime, ) -> Result<(), DeleteError> { - let mut tx = self.db.begin().await?; + let to_not_found = || DeleteError::NotFound(conversation.clone()); + + let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?; let conversation = tx .conversations() .by_id(conversation) .await - .not_found(|| DeleteError::NotFound(conversation.clone()))?; + .optional() + .fail("Failed to load conversation")? + .ok_or_else(to_not_found)?; - let messages = tx.messages().live(&conversation).await?; - let deleted_at = tx.sequence().next(deleted_at).await?; + let messages = tx + .messages() + .live(&conversation) + .await + .fail("Failed to load messages")?; + let deleted_at = tx + .sequence() + .next(deleted_at) + .await + .fail("Failed to find event sequence number")?; let has_messages = messages .iter() @@ -100,9 +120,12 @@ impl Conversations { let events = conversation .events() .filter(Sequence::start_from(deleted_at)); - tx.conversations().record_events(events.clone()).await?; + tx.conversations() + .record_events(events.clone()) + .await + .fail("Failed to store events")?; - tx.commit().await?; + tx.commit().await.fail(db::failed::COMMIT)?; self.events.broadcast_from(events); @@ -114,23 +137,33 @@ impl Conversations { // expired until their messages expire. let expire_at = relative_to.to_owned() - TimeDelta::days(7); - let mut tx = self.db.begin().await?; - let expired = tx.conversations().expired(&expire_at).await?; + let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?; + + let expired = tx + .conversations() + .expired(&expire_at) + .await + .fail("Failed to load expirable conversations")?; let mut events = Vec::with_capacity(expired.len()); for conversation in expired { - let deleted = tx.sequence().next(relative_to).await?; + let deleted = tx + .sequence() + .next(relative_to) + .await + .fail("Failed to find event sequence number")?; let conversation = conversation.delete(deleted)?; let conversation_events = conversation.events().filter(Sequence::start_from(deleted)); tx.conversations() .record_events(conversation_events.clone()) - .await?; + .await + .fail("Failed to store events")?; events.push(conversation_events); } - tx.commit().await?; + tx.commit().await.fail(db::failed::COMMIT)?; self.events .broadcast_from(events.into_iter().kmerge_by(Sequence::merge)); @@ -157,39 +190,17 @@ pub enum CreateError { #[error("invalid conversation name: {0}")] InvalidName(Name), #[error(transparent)] - Database(#[from] sqlx::Error), - #[error(transparent)] - Name(#[from] name::Error), -} - -impl From<LoadError> for CreateError { - fn from(error: LoadError) -> Self { - match error { - LoadError::Database(error) => error.into(), - LoadError::Name(error) => error.into(), - } - } + Failed(#[from] Failed), } #[derive(Debug, thiserror::Error)] -pub enum Error { +pub enum GetError { #[error("conversation {0} not found")] NotFound(Id), #[error("conversation {0} deleted")] Deleted(Id), #[error(transparent)] - Database(#[from] sqlx::Error), - #[error(transparent)] - Name(#[from] name::Error), -} - -impl From<LoadError> for Error { - fn from(error: LoadError) -> Self { - match error { - LoadError::Database(error) => error.into(), - LoadError::Name(error) => error.into(), - } - } + Failed(#[from] Failed), } #[derive(Debug, thiserror::Error)] @@ -201,51 +212,27 @@ pub enum DeleteError { #[error("conversation {0} not empty")] NotEmpty(Id), #[error(transparent)] - Database(#[from] sqlx::Error), - #[error(transparent)] - Name(#[from] name::Error), -} - -impl From<LoadError> for DeleteError { - fn from(error: LoadError) -> Self { - match error { - LoadError::Database(error) => error.into(), - LoadError::Name(error) => error.into(), - } - } + Failed(#[from] Failed), } impl From<history::DeleteError> for DeleteError { fn from(error: history::DeleteError) -> Self { - match error { - history::DeleteError::Deleted(conversation) => Self::Deleted(conversation.id().clone()), - } + let history::DeleteError::Deleted(conversation) = error; + Self::Deleted(conversation.id().clone()) } } #[derive(Debug, thiserror::Error)] pub enum ExpireError { - #[error("tried to expire already-deleted conversation: {0}")] + #[error("tried to expire already-deleted conversation {0}")] Deleted(Id), #[error(transparent)] - Database(#[from] sqlx::Error), - #[error(transparent)] - Name(#[from] name::Error), -} - -impl From<LoadError> for ExpireError { - fn from(error: LoadError) -> Self { - match error { - LoadError::Database(error) => error.into(), - LoadError::Name(error) => error.into(), - } - } + Failed(#[from] Failed), } impl From<history::DeleteError> for ExpireError { fn from(error: history::DeleteError) -> Self { - match error { - history::DeleteError::Deleted(conversation) => Self::Deleted(conversation.id().clone()), - } + let history::DeleteError::Deleted(conversation) = error; + Self::Deleted(conversation.id().clone()) } } |
