use chrono::TimeDelta; use itertools::Itertools; use sqlx::sqlite::SqlitePool; use super::{ Conversation, Id, repo::{LoadError, Provider as _}, validate, }; use crate::{ clock::DateTime, db::{Duplicate as _, NotFound as _}, event::{Broadcaster, Event, Sequence, repo::Provider as _}, message::{self, repo::Provider as _}, name::{self, Name}, }; pub struct Conversations<'a> { db: &'a SqlitePool, events: &'a Broadcaster, } impl<'a> Conversations<'a> { pub const fn new(db: &'a SqlitePool, events: &'a Broadcaster) -> Self { Self { db, events } } pub async fn create( &self, name: &Name, created_at: &DateTime, ) -> Result { if !validate::name(name) { return Err(CreateError::InvalidName(name.clone())); } let mut tx = self.db.begin().await?; let created = tx.sequence().next(created_at).await?; let conversation = tx .conversations() .create(name, &created) .await .duplicate(|| CreateError::DuplicateName(name.clone()))?; tx.commit().await?; self.events .broadcast(conversation.events().map(Event::from).collect::>()); Ok(conversation.as_created()) } // This function is careless with respect to time, and gets you the // conversation as it exists in the specific moment when you call it. pub async fn get(&self, conversation: &Id) -> Result { let to_not_found = || Error::NotFound(conversation.clone()); let to_deleted = || Error::Deleted(conversation.clone()); let mut tx = self.db.begin().await?; let conversation = tx .conversations() .by_id(conversation) .await .not_found(to_not_found)?; tx.commit().await?; conversation.as_snapshot().ok_or_else(to_deleted) } pub async fn delete( &self, conversation: &Id, deleted_at: &DateTime, ) -> Result<(), DeleteError> { let mut tx = self.db.begin().await?; let conversation = tx .conversations() .by_id(conversation) .await .not_found(|| DeleteError::NotFound(conversation.clone()))?; conversation .as_snapshot() .ok_or_else(|| DeleteError::Deleted(conversation.id().clone()))?; let mut events = Vec::new(); let messages = tx.messages().live(&conversation).await?; let has_messages = messages .iter() .map(message::History::as_snapshot) .any(|message| message.is_some()); if has_messages { return Err(DeleteError::NotEmpty(conversation.id().clone())); } let deleted = tx.sequence().next(deleted_at).await?; let conversation = tx.conversations().delete(&conversation, &deleted).await?; events.extend( conversation .events() .filter(Sequence::start_from(deleted.sequence)) .map(Event::from), ); tx.commit().await?; self.events.broadcast(events); Ok(()) } pub async fn expire(&self, relative_to: &DateTime) -> Result<(), ExpireError> { // Somewhat arbitrarily, expire after 7 days. Active conversation will not be // expired until their messages expire. let expire_at = relative_to.to_owned() - TimeDelta::days(7); let mut tx = self.db.begin().await?; let expired = tx.conversations().expired(&expire_at).await?; let mut events = Vec::with_capacity(expired.len()); for conversation in expired { let deleted = tx.sequence().next(relative_to).await?; let conversation = tx.conversations().delete(&conversation, &deleted).await?; events.push( conversation .events() .filter(Sequence::start_from(deleted.sequence)), ); } tx.commit().await?; self.events.broadcast( events .into_iter() .kmerge_by(Sequence::merge) .map(Event::from) .collect::>(), ); Ok(()) } pub async fn purge(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { // Somewhat arbitrarily, purge after 6 hours. let purge_at = relative_to.to_owned() - TimeDelta::hours(6); let mut tx = self.db.begin().await?; tx.conversations().purge(&purge_at).await?; tx.commit().await?; Ok(()) } } #[derive(Debug, thiserror::Error)] pub enum CreateError { #[error("conversation named {0} already exists")] DuplicateName(Name), #[error("invalid conversation name: {0}")] InvalidName(Name), #[error(transparent)] Database(#[from] sqlx::Error), #[error(transparent)] Name(#[from] name::Error), } impl From for CreateError { fn from(error: LoadError) -> Self { match error { LoadError::Database(error) => error.into(), LoadError::Name(error) => error.into(), } } } #[derive(Debug, thiserror::Error)] pub enum Error { #[error("conversation {0} not found")] NotFound(Id), #[error("conversation {0} deleted")] Deleted(Id), #[error(transparent)] Database(#[from] sqlx::Error), #[error(transparent)] Name(#[from] name::Error), } impl From for Error { fn from(error: LoadError) -> Self { match error { LoadError::Database(error) => error.into(), LoadError::Name(error) => error.into(), } } } #[derive(Debug, thiserror::Error)] pub enum DeleteError { #[error("conversation {0} not found")] NotFound(Id), #[error("conversation {0} deleted")] Deleted(Id), #[error("conversation {0} not empty")] NotEmpty(Id), #[error(transparent)] Database(#[from] sqlx::Error), #[error(transparent)] Name(#[from] name::Error), } impl From for DeleteError { fn from(error: LoadError) -> Self { match error { LoadError::Database(error) => error.into(), LoadError::Name(error) => error.into(), } } } #[derive(Debug, thiserror::Error)] pub enum ExpireError { #[error(transparent)] Database(#[from] sqlx::Error), #[error(transparent)] Name(#[from] name::Error), } impl From for ExpireError { fn from(error: LoadError) -> Self { match error { LoadError::Database(error) => error.into(), LoadError::Name(error) => error.into(), } } }