use chrono::TimeDelta; use itertools::Itertools; use sqlx::sqlite::SqlitePool; use super::{repo::Provider as _, Body, Id, Message}; use crate::{ channel::{self, repo::Provider as _}, clock::DateTime, db::NotFound as _, event::{repo::Provider as _, Broadcaster, Event, Sequence}, login::Login, name, }; pub struct Messages<'a> { db: &'a SqlitePool, events: &'a Broadcaster, } impl<'a> Messages<'a> { pub const fn new(db: &'a SqlitePool, events: &'a Broadcaster) -> Self { Self { db, events } } pub async fn send( &self, channel: &channel::Id, sender: &Login, sent_at: &DateTime, body: &Body, ) -> Result { let mut tx = self.db.begin().await?; let channel = tx .channels() .by_id(channel) .await .not_found(|| SendError::ChannelNotFound(channel.clone()))?; let sent = tx.sequence().next(sent_at).await?; let message = tx.messages().create(&channel, sender, &sent, body).await?; tx.commit().await?; self.events .broadcast(message.events().map(Event::from).collect::>()); Ok(message.as_sent()) } pub async fn delete( &self, deleted_by: &Login, message: &Id, deleted_at: &DateTime, ) -> Result<(), DeleteError> { let mut tx = self.db.begin().await?; let message = tx .messages() .by_id(message) .await .not_found(|| DeleteError::NotFound(message.clone()))?; let snapshot = message .as_snapshot() .ok_or_else(|| DeleteError::Deleted(message.id().clone()))?; if snapshot.sender != deleted_by.id { return Err(DeleteError::NotSender(deleted_by.clone())); } let deleted = tx.sequence().next(deleted_at).await?; let message = tx.messages().delete(&message, &deleted).await?; tx.commit().await?; self.events.broadcast( message .events() .filter(Sequence::start_from(deleted.sequence)) .map(Event::from) .collect::>(), ); Ok(()) } pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { // Somewhat arbitrarily, expire after 15 days. let expire_at = relative_to.to_owned() - TimeDelta::days(15); let mut tx = self.db.begin().await?; let expired = tx.messages().expired(&expire_at).await?; let mut events = Vec::with_capacity(expired.len()); for message in expired { let deleted = tx.sequence().next(relative_to).await?; let message = tx.messages().delete(&message, &deleted).await?; events.push( message .events() .filter(Sequence::start_from(deleted.sequence)), ); } tx.commit().await?; self.events.broadcast( events .into_iter() .kmerge_by(Sequence::merge) .map(Event::from) .collect::>(), ); Ok(()) } pub async fn purge(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { // Somewhat arbitrarily, purge after 6 hours. let purge_at = relative_to.to_owned() - TimeDelta::hours(6); let mut tx = self.db.begin().await?; tx.messages().purge(&purge_at).await?; tx.commit().await?; Ok(()) } } #[derive(Debug, thiserror::Error)] pub enum SendError { #[error("channel {0} not found")] ChannelNotFound(channel::Id), #[error(transparent)] Database(#[from] sqlx::Error), #[error(transparent)] Name(#[from] name::Error), } impl From for SendError { fn from(error: channel::repo::LoadError) -> Self { use channel::repo::LoadError; match error { LoadError::Database(error) => error.into(), LoadError::Name(error) => error.into(), } } } #[derive(Debug, thiserror::Error)] pub enum DeleteError { #[error("message {0} not found")] NotFound(Id), #[error("login {} not the message's sender", .0.id)] NotSender(Login), #[error("message {0} deleted")] Deleted(Id), #[error(transparent)] Database(#[from] sqlx::Error), }