use chrono::TimeDelta; use itertools::Itertools; use sqlx::sqlite::SqlitePool; use super::{Body, History, Id, Message, history, repo::Provider as _}; use crate::{ clock::DateTime, conversation::{self, repo::Provider as _}, db::NotFound as _, event::{Broadcaster, Sequence, repo::Provider as _}, login::Login, name, user::{self, repo::Provider as _}, }; pub struct Messages { db: SqlitePool, events: Broadcaster, } impl Messages { pub const fn new(db: SqlitePool, events: Broadcaster) -> Self { Self { db, events } } pub async fn send( &self, conversation: &conversation::Id, sender: &Login, sent_at: &DateTime, body: &Body, ) -> Result { let conversation_not_found = || SendError::ConversationNotFound(conversation.clone()); let conversation_deleted = || SendError::ConversationDeleted(conversation.clone()); let sender_not_found = || SendError::SenderNotFound(sender.id.clone().into()); let sender_deleted = || SendError::SenderDeleted(sender.id.clone().into()); let mut tx = self.db.begin().await?; let conversation = tx .conversations() .by_id(conversation) .await .not_found(conversation_not_found)?; let sender = tx .users() .by_login(sender) .await .not_found(sender_not_found)?; // Ordering: don't bother allocating a sequence number before we know the channel might // exist. let sent = tx.sequence().next(sent_at).await?; let conversation = conversation.as_of(sent).ok_or_else(conversation_deleted)?; let sender = sender.as_of(sent).ok_or_else(sender_deleted)?; let message = History::begin(&conversation, &sender, body, sent); // This filter technically includes every event in the history, but it's easier to follow if // the various event-manipulating app methods are consistent, and it's harmless to have an // always-satisfied filter. let events = message.events().filter(Sequence::start_from(sent)); tx.messages().record_events(events.clone()).await?; tx.commit().await?; self.events.broadcast_from(events); Ok(message.as_sent()) } pub async fn delete( &self, deleted_by: &Login, message: &Id, deleted_at: &DateTime, ) -> Result<(), DeleteError> { let message_not_found = || DeleteError::MessageNotFound(message.clone()); let not_sender = || DeleteError::NotSender(deleted_by.id.clone().into()); let mut tx = self.db.begin().await?; let message = tx .messages() .by_id(message) .await .not_found(message_not_found)?; if message.sender() == &deleted_by.id { let deleted_at = tx.sequence().next(deleted_at).await?; let message = message.delete(deleted_at)?; let events = message.events().filter(Sequence::start_from(deleted_at)); tx.messages().record_events(events.clone()).await?; tx.commit().await?; self.events.broadcast_from(events); Ok(()) } else { Err(not_sender()) } } pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { // Somewhat arbitrarily, expire after 30 days. let expire_at = relative_to.to_owned() - TimeDelta::days(30); let mut tx = self.db.begin().await?; let expired = tx.messages().expired(&expire_at).await?; let mut events = Vec::with_capacity(expired.len()); for message in expired { let deleted = tx.sequence().next(relative_to).await?; match message.delete(deleted) { Ok(message) => { let message_events = message .events() .filter(Sequence::start_from(deleted.sequence)); tx.messages().record_events(message_events.clone()).await?; events.push(message_events); } Err(history::DeleteError::Deleted(_)) => {} } } tx.commit().await?; self.events .broadcast_from(events.into_iter().kmerge_by(Sequence::merge)); Ok(()) } pub async fn purge(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { // Somewhat arbitrarily, purge after 6 hours. let purge_at = relative_to.to_owned() - TimeDelta::hours(6); let mut tx = self.db.begin().await?; tx.messages().purge(&purge_at).await?; tx.commit().await?; Ok(()) } } #[derive(Debug, thiserror::Error)] pub enum SendError { #[error("conversation {0} not found")] ConversationNotFound(conversation::Id), #[error("conversation {0} deleted")] ConversationDeleted(conversation::Id), #[error("user {0} not found")] SenderNotFound(user::Id), #[error("user {0} deleted")] SenderDeleted(user::Id), #[error(transparent)] Database(#[from] sqlx::Error), #[error(transparent)] Name(#[from] name::Error), } impl From for SendError { fn from(error: conversation::repo::LoadError) -> Self { use conversation::repo::LoadError; match error { LoadError::Database(error) => error.into(), LoadError::Name(error) => error.into(), } } } impl From for SendError { fn from(error: user::repo::LoadError) -> Self { use user::repo::LoadError; match error { LoadError::Database(error) => error.into(), LoadError::Name(error) => error.into(), } } } #[derive(Debug, thiserror::Error)] pub enum DeleteError { #[error("message {0} not found")] MessageNotFound(Id), #[error("user {0} not the message's sender")] NotSender(user::Id), #[error("message {0} deleted")] Deleted(Id), #[error(transparent)] Database(#[from] sqlx::Error), #[error(transparent)] Name(#[from] name::Error), } impl From for DeleteError { fn from(error: user::repo::LoadError) -> Self { use user::repo::LoadError; match error { LoadError::Database(error) => error.into(), LoadError::Name(error) => error.into(), } } } impl From for DeleteError { fn from(error: history::DeleteError) -> Self { match error { history::DeleteError::Deleted(message) => Self::Deleted(message.id().clone()), } } }