summaryrefslogtreecommitdiff
path: root/src/conversation/app.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/conversation/app.rs')
-rw-r--r--src/conversation/app.rs155
1 files changed, 71 insertions, 84 deletions
diff --git a/src/conversation/app.rs b/src/conversation/app.rs
index 2b62e77..73c655b 100644
--- a/src/conversation/app.rs
+++ b/src/conversation/app.rs
@@ -2,17 +2,14 @@ use chrono::TimeDelta;
use itertools::Itertools;
use sqlx::sqlite::SqlitePool;
-use super::{
- Conversation, History, Id, history,
- repo::{LoadError, Provider as _},
- validate,
-};
+use super::{Conversation, History, Id, history, repo::Provider as _, validate};
use crate::{
clock::DateTime,
- db::{Duplicate as _, NotFound as _},
+ db::{self, NotFound as _},
+ error::failed::{ErrorExt as _, Failed, ResultExt as _},
event::{Broadcaster, Sequence, repo::Provider as _},
message::repo::Provider as _,
- name::{self, Name},
+ name::Name,
};
pub struct Conversations {
@@ -34,8 +31,12 @@ impl Conversations {
return Err(CreateError::InvalidName(name.clone()));
}
- let mut tx = self.db.begin().await?;
- let created = tx.sequence().next(created_at).await?;
+ let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?;
+ let created = tx
+ .sequence()
+ .next(created_at)
+ .await
+ .fail("Failed to find event sequence number")?;
let conversation = History::begin(name, created);
// This filter technically includes every event in the history, but it's easier to follow if
@@ -45,9 +46,12 @@ impl Conversations {
tx.conversations()
.record_events(events.clone())
.await
- .duplicate(|| CreateError::DuplicateName(name.clone()))?;
+ .map_err(|err| match err.as_database_error() {
+ Some(err) if err.is_unique_violation() => CreateError::DuplicateName(name.clone()),
+ _ => err.fail("Failed to store events"),
+ })?;
- tx.commit().await?;
+ tx.commit().await.fail(db::failed::COMMIT)?;
self.events.broadcast_from(events);
@@ -56,17 +60,21 @@ impl Conversations {
// This function is careless with respect to time, and gets you the
// conversation as it exists in the specific moment when you call it.
- pub async fn get(&self, conversation: &Id) -> Result<Conversation, Error> {
- let to_not_found = || Error::NotFound(conversation.clone());
- let to_deleted = || Error::Deleted(conversation.clone());
+ pub async fn get(&self, conversation: &Id) -> Result<Conversation, GetError> {
+ let to_not_found = || GetError::NotFound(conversation.clone());
+ let to_deleted = || GetError::Deleted(conversation.clone());
+
+ let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?;
- let mut tx = self.db.begin().await?;
let conversation = tx
.conversations()
.by_id(conversation)
.await
- .not_found(to_not_found)?;
- tx.commit().await?;
+ .optional()
+ .fail("Failed to load conversation")?
+ .ok_or_else(to_not_found)?;
+
+ tx.commit().await.fail(db::failed::COMMIT)?;
conversation.as_snapshot().ok_or_else(to_deleted)
}
@@ -76,16 +84,28 @@ impl Conversations {
conversation: &Id,
deleted_at: &DateTime,
) -> Result<(), DeleteError> {
- let mut tx = self.db.begin().await?;
+ let to_not_found = || DeleteError::NotFound(conversation.clone());
+
+ let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?;
let conversation = tx
.conversations()
.by_id(conversation)
.await
- .not_found(|| DeleteError::NotFound(conversation.clone()))?;
+ .optional()
+ .fail("Failed to load conversation")?
+ .ok_or_else(to_not_found)?;
- let messages = tx.messages().live(&conversation).await?;
- let deleted_at = tx.sequence().next(deleted_at).await?;
+ let messages = tx
+ .messages()
+ .live(&conversation)
+ .await
+ .fail("Failed to load messages")?;
+ let deleted_at = tx
+ .sequence()
+ .next(deleted_at)
+ .await
+ .fail("Failed to find event sequence number")?;
let has_messages = messages
.iter()
@@ -100,9 +120,12 @@ impl Conversations {
let events = conversation
.events()
.filter(Sequence::start_from(deleted_at));
- tx.conversations().record_events(events.clone()).await?;
+ tx.conversations()
+ .record_events(events.clone())
+ .await
+ .fail("Failed to store events")?;
- tx.commit().await?;
+ tx.commit().await.fail(db::failed::COMMIT)?;
self.events.broadcast_from(events);
@@ -114,23 +137,33 @@ impl Conversations {
// expired until their messages expire.
let expire_at = relative_to.to_owned() - TimeDelta::days(7);
- let mut tx = self.db.begin().await?;
- let expired = tx.conversations().expired(&expire_at).await?;
+ let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?;
+
+ let expired = tx
+ .conversations()
+ .expired(&expire_at)
+ .await
+ .fail("Failed to load expirable conversations")?;
let mut events = Vec::with_capacity(expired.len());
for conversation in expired {
- let deleted = tx.sequence().next(relative_to).await?;
+ let deleted = tx
+ .sequence()
+ .next(relative_to)
+ .await
+ .fail("Failed to find event sequence number")?;
let conversation = conversation.delete(deleted)?;
let conversation_events = conversation.events().filter(Sequence::start_from(deleted));
tx.conversations()
.record_events(conversation_events.clone())
- .await?;
+ .await
+ .fail("Failed to store events")?;
events.push(conversation_events);
}
- tx.commit().await?;
+ tx.commit().await.fail(db::failed::COMMIT)?;
self.events
.broadcast_from(events.into_iter().kmerge_by(Sequence::merge));
@@ -157,39 +190,17 @@ pub enum CreateError {
#[error("invalid conversation name: {0}")]
InvalidName(Name),
#[error(transparent)]
- Database(#[from] sqlx::Error),
- #[error(transparent)]
- Name(#[from] name::Error),
-}
-
-impl From<LoadError> for CreateError {
- fn from(error: LoadError) -> Self {
- match error {
- LoadError::Database(error) => error.into(),
- LoadError::Name(error) => error.into(),
- }
- }
+ Failed(#[from] Failed),
}
#[derive(Debug, thiserror::Error)]
-pub enum Error {
+pub enum GetError {
#[error("conversation {0} not found")]
NotFound(Id),
#[error("conversation {0} deleted")]
Deleted(Id),
#[error(transparent)]
- Database(#[from] sqlx::Error),
- #[error(transparent)]
- Name(#[from] name::Error),
-}
-
-impl From<LoadError> for Error {
- fn from(error: LoadError) -> Self {
- match error {
- LoadError::Database(error) => error.into(),
- LoadError::Name(error) => error.into(),
- }
- }
+ Failed(#[from] Failed),
}
#[derive(Debug, thiserror::Error)]
@@ -201,51 +212,27 @@ pub enum DeleteError {
#[error("conversation {0} not empty")]
NotEmpty(Id),
#[error(transparent)]
- Database(#[from] sqlx::Error),
- #[error(transparent)]
- Name(#[from] name::Error),
-}
-
-impl From<LoadError> for DeleteError {
- fn from(error: LoadError) -> Self {
- match error {
- LoadError::Database(error) => error.into(),
- LoadError::Name(error) => error.into(),
- }
- }
+ Failed(#[from] Failed),
}
impl From<history::DeleteError> for DeleteError {
fn from(error: history::DeleteError) -> Self {
- match error {
- history::DeleteError::Deleted(conversation) => Self::Deleted(conversation.id().clone()),
- }
+ let history::DeleteError::Deleted(conversation) = error;
+ Self::Deleted(conversation.id().clone())
}
}
#[derive(Debug, thiserror::Error)]
pub enum ExpireError {
- #[error("tried to expire already-deleted conversation: {0}")]
+ #[error("tried to expire already-deleted conversation {0}")]
Deleted(Id),
#[error(transparent)]
- Database(#[from] sqlx::Error),
- #[error(transparent)]
- Name(#[from] name::Error),
-}
-
-impl From<LoadError> for ExpireError {
- fn from(error: LoadError) -> Self {
- match error {
- LoadError::Database(error) => error.into(),
- LoadError::Name(error) => error.into(),
- }
- }
+ Failed(#[from] Failed),
}
impl From<history::DeleteError> for ExpireError {
fn from(error: history::DeleteError) -> Self {
- match error {
- history::DeleteError::Deleted(conversation) => Self::Deleted(conversation.id().clone()),
- }
+ let history::DeleteError::Deleted(conversation) = error;
+ Self::Deleted(conversation.id().clone())
}
}