use chrono::TimeDelta; use itertools::Itertools; use sqlx::sqlite::SqlitePool; use super::{Body, Id, Message, repo::Provider as _}; use crate::{ clock::DateTime, conversation::{self, repo::Provider as _}, db::NotFound as _, event::{Broadcaster, Event, Sequence, repo::Provider as _}, login::Login, name, user::{self, repo::Provider as _}, }; pub struct Messages<'a> { db: &'a SqlitePool, events: &'a Broadcaster, } impl<'a> Messages<'a> { pub const fn new(db: &'a SqlitePool, events: &'a Broadcaster) -> Self { Self { db, events } } pub async fn send( &self, conversation: &conversation::Id, sender: &Login, sent_at: &DateTime, body: &Body, ) -> Result { let conversation_not_found = || SendError::ConversationNotFound(conversation.clone()); let conversation_deleted = || SendError::ConversationDeleted(conversation.clone()); let sender_not_found = || SendError::SenderNotFound(sender.id.clone().into()); let sender_deleted = || SendError::SenderDeleted(sender.id.clone().into()); let mut tx = self.db.begin().await?; let conversation = tx .conversations() .by_id(conversation) .await .not_found(conversation_not_found)?; let sender = tx .users() .by_login(sender) .await .not_found(sender_not_found)?; // Ordering: don't bother allocating a sequence number before we know the channel might // exist. let sent = tx.sequence().next(sent_at).await?; let conversation = conversation.as_of(sent).ok_or_else(conversation_deleted)?; let sender = sender.as_of(sent).ok_or_else(sender_deleted)?; let message = tx .messages() .create(&conversation, &sender, &sent, body) .await?; tx.commit().await?; self.events .broadcast(message.events().map(Event::from).collect::>()); Ok(message.as_sent()) } pub async fn delete( &self, deleted_by: &Login, message: &Id, deleted_at: &DateTime, ) -> Result<(), DeleteError> { let message_not_found = || DeleteError::MessageNotFound(message.clone()); let message_deleted = || DeleteError::Deleted(message.clone()); let deleter_not_found = || DeleteError::UserNotFound(deleted_by.id.clone().into()); let deleter_deleted = || DeleteError::UserDeleted(deleted_by.id.clone().into()); let not_sender = || DeleteError::NotSender(deleted_by.id.clone().into()); let mut tx = self.db.begin().await?; let message = tx .messages() .by_id(message) .await .not_found(message_not_found)?; let deleted_by = tx .users() .by_login(deleted_by) .await .not_found(deleter_not_found)?; let deleted = tx.sequence().next(deleted_at).await?; let message = message.as_of(deleted).ok_or_else(message_deleted)?; let deleted_by = deleted_by.as_of(deleted).ok_or_else(deleter_deleted)?; if message.sender == deleted_by.id { let message = tx.messages().delete(&message, &deleted).await?; tx.commit().await?; self.events.broadcast( message .events() .filter(Sequence::start_from(deleted.sequence)) .map(Event::from) .collect::>(), ); Ok(()) } else { Err(not_sender()) } } pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { // Somewhat arbitrarily, expire after 30 days. let expire_at = relative_to.to_owned() - TimeDelta::days(30); let mut tx = self.db.begin().await?; let expired = tx.messages().expired(&expire_at).await?; let mut events = Vec::with_capacity(expired.len()); for message in expired { let deleted = tx.sequence().next(relative_to).await?; if let Some(message) = message.as_of(deleted) { let message = tx.messages().delete(&message, &deleted).await?; events.push( message .events() .filter(Sequence::start_from(deleted.sequence)), ); } } tx.commit().await?; self.events.broadcast( events .into_iter() .kmerge_by(Sequence::merge) .map(Event::from) .collect::>(), ); Ok(()) } pub async fn purge(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { // Somewhat arbitrarily, purge after 6 hours. let purge_at = relative_to.to_owned() - TimeDelta::hours(6); let mut tx = self.db.begin().await?; tx.messages().purge(&purge_at).await?; tx.commit().await?; Ok(()) } } #[derive(Debug, thiserror::Error)] pub enum SendError { #[error("conversation {0} not found")] ConversationNotFound(conversation::Id), #[error("conversation {0} deleted")] ConversationDeleted(conversation::Id), #[error("user {0} not found")] SenderNotFound(user::Id), #[error("user {0} deleted")] SenderDeleted(user::Id), #[error(transparent)] Database(#[from] sqlx::Error), #[error(transparent)] Name(#[from] name::Error), } impl From for SendError { fn from(error: conversation::repo::LoadError) -> Self { use conversation::repo::LoadError; match error { LoadError::Database(error) => error.into(), LoadError::Name(error) => error.into(), } } } impl From for SendError { fn from(error: user::repo::LoadError) -> Self { use user::repo::LoadError; match error { LoadError::Database(error) => error.into(), LoadError::Name(error) => error.into(), } } } #[derive(Debug, thiserror::Error)] pub enum DeleteError { #[error("message {0} not found")] MessageNotFound(Id), #[error("user {0} not found")] UserNotFound(user::Id), #[error("user {0} deleted")] UserDeleted(user::Id), #[error("user {0} not the message's sender")] NotSender(user::Id), #[error("message {0} deleted")] Deleted(Id), #[error(transparent)] Database(#[from] sqlx::Error), #[error(transparent)] Name(#[from] name::Error), } impl From for DeleteError { fn from(error: user::repo::LoadError) -> Self { use user::repo::LoadError; match error { LoadError::Database(error) => error.into(), LoadError::Name(error) => error.into(), } } }