use chrono::TimeDelta; use itertools::Itertools; use sqlx::sqlite::SqlitePool; use super::{Body, History, Id, Message, history, repo::Provider as _}; use crate::{ clock::DateTime, conversation::{self, repo::Provider as _}, db, db::NotFound as _, error::failed::{Failed, ResultExt as _}, event::{Broadcaster, Event, Sequence, repo::Provider as _}, login::Login, push::{Publish, repo::Provider as _}, user::{self, repo::Provider as _}, vapid::repo::Provider as _, }; pub struct Messages
{ db: SqlitePool, events: Broadcaster, publisher: P, } impl
Messages
{ pub const fn new(db: SqlitePool, events: Broadcaster, publisher: P) -> Self { Self { db, events, publisher, } } pub async fn delete( &self, deleted_by: &Login, message: &Id, deleted_at: &DateTime, ) -> Result<(), DeleteError> { let message_not_found = || DeleteError::MessageNotFound(message.clone()); let not_sender = || DeleteError::NotSender(deleted_by.id.clone().into()); let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?; let message = tx .messages() .by_id(message) .await .optional() .fail("Failed to load message")? .ok_or_else(message_not_found)?; if message.sender() == &deleted_by.id { let deleted_at = tx .sequence() .next(deleted_at) .await .fail("Failed to find event sequence number")?; let message = message.delete(deleted_at)?; let events = message.events().filter(Sequence::start_from(deleted_at)); tx.messages() .record_events(events.clone()) .await .fail("Failed to store events")?; tx.commit().await.fail(db::failed::COMMIT)?; self.events.broadcast_from(events); Ok(()) } else { Err(not_sender()) } } pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { // Somewhat arbitrarily, expire after 30 days. let expire_at = relative_to.to_owned() - TimeDelta::days(30); let mut tx = self.db.begin().await?; let expired = tx.messages().expired(&expire_at).await?; let mut events = Vec::with_capacity(expired.len()); for message in expired { let deleted = tx.sequence().next(relative_to).await?; match message.delete(deleted) { Ok(message) => { let message_events = message .events() .filter(Sequence::start_from(deleted.sequence)); tx.messages().record_events(message_events.clone()).await?; events.push(message_events); } Err(history::DeleteError::Deleted(_)) => {} } } tx.commit().await?; self.events .broadcast_from(events.into_iter().kmerge_by(Sequence::merge)); Ok(()) } pub async fn purge(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { // Somewhat arbitrarily, purge after 6 hours. let purge_at = relative_to.to_owned() - TimeDelta::hours(6); let mut tx = self.db.begin().await?; tx.messages().purge(&purge_at).await?; tx.commit().await?; Ok(()) } } impl
Messages
where
P: Publish,
{
pub async fn send(
&self,
conversation: &conversation::Id,
sender: &Login,
sent_at: &DateTime,
body: &Body,
) -> Result