use chrono::TimeDelta; use itertools::Itertools; use sqlx::sqlite::SqlitePool; use super::{Body, History, Id, Message, history, repo::Provider as _}; use crate::{ clock::DateTime, conversation::{self, repo::Provider as _}, db, db::NotFound as _, error::failed::{Failed, ResultExt as _}, event::{Broadcaster, Sequence, repo::Provider as _}, login::Login, user::{self, repo::Provider as _}, }; pub struct Messages { db: SqlitePool, events: Broadcaster, } impl Messages { pub const fn new(db: SqlitePool, events: Broadcaster) -> Self { Self { db, events } } pub async fn send( &self, conversation: &conversation::Id, sender: &Login, sent_at: &DateTime, body: &Body, ) -> Result { let conversation_not_found = || SendError::ConversationNotFound(conversation.clone()); let conversation_deleted = || SendError::ConversationDeleted(conversation.clone()); let sender_not_found = || SendError::SenderNotFound(sender.id.clone().into()); let sender_deleted = || SendError::SenderDeleted(sender.id.clone().into()); let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?; let conversation = tx .conversations() .by_id(conversation) .await .optional() .fail("Failed to load conversation")? .ok_or_else(conversation_not_found)?; let sender = tx .users() .by_login(sender) .await .optional() .fail("Failed to load sending user")? .ok_or_else(sender_not_found)?; // Ordering: don't bother allocating a sequence number before we know the channel might // exist. let sent = tx .sequence() .next(sent_at) .await .fail("Failed to find event sequence number")?; let conversation = conversation.as_of(sent).ok_or_else(conversation_deleted)?; let sender = sender.as_of(sent).ok_or_else(sender_deleted)?; let message = History::begin(&conversation, &sender, body, sent); // This filter technically includes every event in the history, but it's easier to follow if // the various event-manipulating app methods are consistent, and it's harmless to have an // always-satisfied filter. let events = message.events().filter(Sequence::start_from(sent)); tx.messages() .record_events(events.clone()) .await .fail("Failed to store events")?; tx.commit().await.fail(db::failed::COMMIT)?; self.events.broadcast_from(events); Ok(message.as_sent()) } pub async fn delete( &self, deleted_by: &Login, message: &Id, deleted_at: &DateTime, ) -> Result<(), DeleteError> { let message_not_found = || DeleteError::MessageNotFound(message.clone()); let not_sender = || DeleteError::NotSender(deleted_by.id.clone().into()); let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?; let message = tx .messages() .by_id(message) .await .optional() .fail("Failed to load message")? .ok_or_else(message_not_found)?; if message.sender() == &deleted_by.id { let deleted_at = tx .sequence() .next(deleted_at) .await .fail("Failed to find event sequence number")?; let message = message.delete(deleted_at)?; let events = message.events().filter(Sequence::start_from(deleted_at)); tx.messages() .record_events(events.clone()) .await .fail("Failed to store events")?; tx.commit().await.fail(db::failed::COMMIT)?; self.events.broadcast_from(events); Ok(()) } else { Err(not_sender()) } } pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { // Somewhat arbitrarily, expire after 30 days. let expire_at = relative_to.to_owned() - TimeDelta::days(30); let mut tx = self.db.begin().await?; let expired = tx.messages().expired(&expire_at).await?; let mut events = Vec::with_capacity(expired.len()); for message in expired { let deleted = tx.sequence().next(relative_to).await?; match message.delete(deleted) { Ok(message) => { let message_events = message .events() .filter(Sequence::start_from(deleted.sequence)); tx.messages().record_events(message_events.clone()).await?; events.push(message_events); } Err(history::DeleteError::Deleted(_)) => {} } } tx.commit().await?; self.events .broadcast_from(events.into_iter().kmerge_by(Sequence::merge)); Ok(()) } pub async fn purge(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { // Somewhat arbitrarily, purge after 6 hours. let purge_at = relative_to.to_owned() - TimeDelta::hours(6); let mut tx = self.db.begin().await?; tx.messages().purge(&purge_at).await?; tx.commit().await?; Ok(()) } } #[derive(Debug, thiserror::Error)] pub enum SendError { #[error("conversation {0} not found")] ConversationNotFound(conversation::Id), #[error("conversation {0} deleted")] ConversationDeleted(conversation::Id), #[error("user {0} not found")] SenderNotFound(user::Id), #[error("user {0} deleted")] SenderDeleted(user::Id), #[error(transparent)] Failed(#[from] Failed), } #[derive(Debug, thiserror::Error)] pub enum DeleteError { #[error("message {0} not found")] MessageNotFound(Id), #[error("user {0} not the message's sender")] NotSender(user::Id), #[error("message {0} deleted")] Deleted(Id), #[error(transparent)] Failed(#[from] Failed), } impl From for DeleteError { fn from(error: history::DeleteError) -> Self { match error { history::DeleteError::Deleted(message) => Self::Deleted(message.id().clone()), } } }