diff options
Diffstat (limited to 'src/conversation/app.rs')
| -rw-r--r-- | src/conversation/app.rs | 236 |
1 files changed, 236 insertions, 0 deletions
diff --git a/src/conversation/app.rs b/src/conversation/app.rs new file mode 100644 index 0000000..81ccdcf --- /dev/null +++ b/src/conversation/app.rs @@ -0,0 +1,236 @@ +use chrono::TimeDelta; +use itertools::Itertools; +use sqlx::sqlite::SqlitePool; + +use super::{ + Conversation, Id, + repo::{LoadError, Provider as _}, + validate, +}; +use crate::{ + clock::DateTime, + db::{Duplicate as _, NotFound as _}, + event::{Broadcaster, Event, Sequence, repo::Provider as _}, + message::{self, repo::Provider as _}, + name::{self, Name}, +}; + +pub struct Conversations<'a> { + db: &'a SqlitePool, + events: &'a Broadcaster, +} + +impl<'a> Conversations<'a> { + pub const fn new(db: &'a SqlitePool, events: &'a Broadcaster) -> Self { + Self { db, events } + } + + pub async fn create( + &self, + name: &Name, + created_at: &DateTime, + ) -> Result<Conversation, CreateError> { + if !validate::name(name) { + return Err(CreateError::InvalidName(name.clone())); + } + + let mut tx = self.db.begin().await?; + let created = tx.sequence().next(created_at).await?; + let conversation = tx + .conversations() + .create(name, &created) + .await + .duplicate(|| CreateError::DuplicateName(name.clone()))?; + tx.commit().await?; + + self.events + .broadcast(conversation.events().map(Event::from).collect::<Vec<_>>()); + + Ok(conversation.as_created()) + } + + // This function is careless with respect to time, and gets you the + // conversation as it exists in the specific moment when you call it. + pub async fn get(&self, conversation: &Id) -> Result<Conversation, Error> { + let to_not_found = || Error::NotFound(conversation.clone()); + let to_deleted = || Error::Deleted(conversation.clone()); + + let mut tx = self.db.begin().await?; + let conversation = tx + .conversations() + .by_id(conversation) + .await + .not_found(to_not_found)?; + tx.commit().await?; + + conversation.as_snapshot().ok_or_else(to_deleted) + } + + pub async fn delete( + &self, + conversation: &Id, + deleted_at: &DateTime, + ) -> Result<(), DeleteError> { + let mut tx = self.db.begin().await?; + + let conversation = tx + .conversations() + .by_id(conversation) + .await + .not_found(|| DeleteError::NotFound(conversation.clone()))?; + conversation + .as_snapshot() + .ok_or_else(|| DeleteError::Deleted(conversation.id().clone()))?; + + let mut events = Vec::new(); + + let messages = tx.messages().live(&conversation).await?; + let has_messages = messages + .iter() + .map(message::History::as_snapshot) + .any(|message| message.is_some()); + if has_messages { + return Err(DeleteError::NotEmpty(conversation.id().clone())); + } + + let deleted = tx.sequence().next(deleted_at).await?; + let conversation = tx.conversations().delete(&conversation, &deleted).await?; + events.extend( + conversation + .events() + .filter(Sequence::start_from(deleted.sequence)) + .map(Event::from), + ); + + tx.commit().await?; + + self.events.broadcast(events); + + Ok(()) + } + + pub async fn expire(&self, relative_to: &DateTime) -> Result<(), ExpireError> { + // Somewhat arbitrarily, expire after 7 days. Active conversation will not be + // expired until their messages expire. + let expire_at = relative_to.to_owned() - TimeDelta::days(7); + + let mut tx = self.db.begin().await?; + let expired = tx.conversations().expired(&expire_at).await?; + + let mut events = Vec::with_capacity(expired.len()); + for conversation in expired { + let deleted = tx.sequence().next(relative_to).await?; + let conversation = tx.conversations().delete(&conversation, &deleted).await?; + events.push( + conversation + .events() + .filter(Sequence::start_from(deleted.sequence)), + ); + } + + tx.commit().await?; + + self.events.broadcast( + events + .into_iter() + .kmerge_by(Sequence::merge) + .map(Event::from) + .collect::<Vec<_>>(), + ); + + Ok(()) + } + + pub async fn purge(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { + // Somewhat arbitrarily, purge after 6 hours. + let purge_at = relative_to.to_owned() - TimeDelta::hours(6); + + let mut tx = self.db.begin().await?; + tx.conversations().purge(&purge_at).await?; + tx.commit().await?; + + Ok(()) + } +} + +#[derive(Debug, thiserror::Error)] +pub enum CreateError { + #[error("conversation named {0} already exists")] + DuplicateName(Name), + #[error("invalid conversation name: {0}")] + InvalidName(Name), + #[error(transparent)] + Database(#[from] sqlx::Error), + #[error(transparent)] + Name(#[from] name::Error), +} + +impl From<LoadError> for CreateError { + fn from(error: LoadError) -> Self { + match error { + LoadError::Database(error) => error.into(), + LoadError::Name(error) => error.into(), + } + } +} + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("conversation {0} not found")] + NotFound(Id), + #[error("conversation {0} deleted")] + Deleted(Id), + #[error(transparent)] + Database(#[from] sqlx::Error), + #[error(transparent)] + Name(#[from] name::Error), +} + +impl From<LoadError> for Error { + fn from(error: LoadError) -> Self { + match error { + LoadError::Database(error) => error.into(), + LoadError::Name(error) => error.into(), + } + } +} + +#[derive(Debug, thiserror::Error)] +pub enum DeleteError { + #[error("conversation {0} not found")] + NotFound(Id), + #[error("conversation {0} deleted")] + Deleted(Id), + #[error("conversation {0} not empty")] + NotEmpty(Id), + #[error(transparent)] + Database(#[from] sqlx::Error), + #[error(transparent)] + Name(#[from] name::Error), +} + +impl From<LoadError> for DeleteError { + fn from(error: LoadError) -> Self { + match error { + LoadError::Database(error) => error.into(), + LoadError::Name(error) => error.into(), + } + } +} + +#[derive(Debug, thiserror::Error)] +pub enum ExpireError { + #[error(transparent)] + Database(#[from] sqlx::Error), + #[error(transparent)] + Name(#[from] name::Error), +} + +impl From<LoadError> for ExpireError { + fn from(error: LoadError) -> Self { + match error { + LoadError::Database(error) => error.into(), + LoadError::Name(error) => error.into(), + } + } +} |
