use chrono::TimeDelta; use itertools::Itertools; use sqlx::sqlite::SqlitePool; use super::{Conversation, History, Id, history, repo::Provider as _, validate}; use crate::{ clock::DateTime, db::{self, NotFound as _}, error::failed::{ErrorExt as _, Failed, ResultExt as _}, event::{Broadcaster, Sequence, repo::Provider as _}, message::repo::Provider as _, name::Name, }; pub struct Conversations { db: SqlitePool, events: Broadcaster, } impl Conversations { pub const fn new(db: SqlitePool, events: Broadcaster) -> Self { Self { db, events } } pub async fn create( &self, name: &Name, created_at: &DateTime, ) -> Result { if !validate::name(name) { return Err(CreateError::InvalidName(name.clone())); } let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?; let created = tx .sequence() .next(created_at) .await .fail("Failed to find event sequence number")?; let conversation = History::begin(name, created); // This filter technically includes every event in the history, but it's easier to follow if // the various event-manipulating app methods are consistent, and it's harmless to have an // always-satisfied filter. let events = conversation.events().filter(Sequence::start_from(created)); tx.conversations() .record_events(events.clone()) .await .map_err(|err| match err.as_database_error() { Some(err) if err.is_unique_violation() => CreateError::DuplicateName(name.clone()), _ => err.fail("Failed to store events"), })?; tx.commit().await.fail(db::failed::COMMIT)?; self.events.broadcast_from(events); Ok(conversation.as_created()) } // This function is careless with respect to time, and gets you the // conversation as it exists in the specific moment when you call it. pub async fn get(&self, conversation: &Id) -> Result { let to_not_found = || GetError::NotFound(conversation.clone()); let to_deleted = || GetError::Deleted(conversation.clone()); let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?; let conversation = tx .conversations() .by_id(conversation) .await .optional() .fail("Failed to load conversation")? .ok_or_else(to_not_found)?; tx.commit().await.fail(db::failed::COMMIT)?; conversation.as_snapshot().ok_or_else(to_deleted) } pub async fn delete( &self, conversation: &Id, deleted_at: &DateTime, ) -> Result<(), DeleteError> { let to_not_found = || DeleteError::NotFound(conversation.clone()); let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?; let conversation = tx .conversations() .by_id(conversation) .await .optional() .fail("Failed to load conversation")? .ok_or_else(to_not_found)?; let messages = tx .messages() .live(&conversation) .await .fail("Failed to load messages")?; let deleted_at = tx .sequence() .next(deleted_at) .await .fail("Failed to find event sequence number")?; let has_messages = messages .iter() .map(|message| message.as_of(deleted_at)) .any(|message| message.is_some()); if has_messages { return Err(DeleteError::NotEmpty(conversation.id().clone())); } let conversation = conversation.delete(deleted_at)?; let events = conversation .events() .filter(Sequence::start_from(deleted_at)); tx.conversations() .record_events(events.clone()) .await .fail("Failed to store events")?; tx.commit().await.fail(db::failed::COMMIT)?; self.events.broadcast_from(events); Ok(()) } pub async fn expire(&self, relative_to: &DateTime) -> Result<(), ExpireError> { // Somewhat arbitrarily, expire after 7 days. Active conversation will not be // expired until their messages expire. let expire_at = relative_to.to_owned() - TimeDelta::days(7); let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?; let expired = tx .conversations() .expired(&expire_at) .await .fail("Failed to load expirable conversations")?; let mut events = Vec::with_capacity(expired.len()); for conversation in expired { let deleted = tx .sequence() .next(relative_to) .await .fail("Failed to find event sequence number")?; let conversation = conversation.delete(deleted)?; let conversation_events = conversation.events().filter(Sequence::start_from(deleted)); tx.conversations() .record_events(conversation_events.clone()) .await .fail("Failed to store events")?; events.push(conversation_events); } tx.commit().await.fail(db::failed::COMMIT)?; self.events .broadcast_from(events.into_iter().kmerge_by(Sequence::merge)); Ok(()) } pub async fn purge(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { // Somewhat arbitrarily, purge after 6 hours. let purge_at = relative_to.to_owned() - TimeDelta::hours(6); let mut tx = self.db.begin().await?; tx.conversations().purge(&purge_at).await?; tx.commit().await?; Ok(()) } } #[derive(Debug, thiserror::Error)] pub enum CreateError { #[error("conversation named {0} already exists")] DuplicateName(Name), #[error("invalid conversation name: {0}")] InvalidName(Name), #[error(transparent)] Failed(#[from] Failed), } #[derive(Debug, thiserror::Error)] pub enum GetError { #[error("conversation {0} not found")] NotFound(Id), #[error("conversation {0} deleted")] Deleted(Id), #[error(transparent)] Failed(#[from] Failed), } #[derive(Debug, thiserror::Error)] pub enum DeleteError { #[error("conversation {0} not found")] NotFound(Id), #[error("conversation {0} deleted")] Deleted(Id), #[error("conversation {0} not empty")] NotEmpty(Id), #[error(transparent)] Failed(#[from] Failed), } impl From for DeleteError { fn from(error: history::DeleteError) -> Self { let history::DeleteError::Deleted(conversation) = error; Self::Deleted(conversation.id().clone()) } } #[derive(Debug, thiserror::Error)] pub enum ExpireError { #[error("tried to expire already-deleted conversation {0}")] Deleted(Id), #[error(transparent)] Failed(#[from] Failed), } impl From for ExpireError { fn from(error: history::DeleteError) -> Self { let history::DeleteError::Deleted(conversation) = error; Self::Deleted(conversation.id().clone()) } }