diff options
Diffstat (limited to 'src/message/app.rs')
| -rw-r--r-- | src/message/app.rs | 115 |
1 files changed, 115 insertions, 0 deletions
diff --git a/src/message/app.rs b/src/message/app.rs new file mode 100644 index 0000000..33ea8ad --- /dev/null +++ b/src/message/app.rs @@ -0,0 +1,115 @@ +use chrono::TimeDelta; +use itertools::Itertools; +use sqlx::sqlite::SqlitePool; + +use super::{repo::Provider as _, Id, Message}; +use crate::{ + channel::{self, repo::Provider as _}, + clock::DateTime, + db::NotFound as _, + event::{broadcaster::Broadcaster, repo::Provider as _, Event, Sequence}, + login::Login, +}; + +pub struct Messages<'a> { + db: &'a SqlitePool, + events: &'a Broadcaster, +} + +impl<'a> Messages<'a> { + pub const fn new(db: &'a SqlitePool, events: &'a Broadcaster) -> Self { + Self { db, events } + } + + pub async fn send( + &self, + channel: &channel::Id, + sender: &Login, + sent_at: &DateTime, + body: &str, + ) -> Result<Message, SendError> { + let mut tx = self.db.begin().await?; + let channel = tx + .channels() + .by_id(channel) + .await + .not_found(|| SendError::ChannelNotFound(channel.clone()))?; + let sent = tx.sequence().next(sent_at).await?; + let message = tx + .messages() + .create(&channel.snapshot(), sender, &sent, body) + .await?; + tx.commit().await?; + + self.events + .broadcast(message.events().map(Event::from).collect::<Vec<_>>()); + + Ok(message.snapshot()) + } + + pub async fn delete(&self, message: &Id, deleted_at: &DateTime) -> Result<(), DeleteError> { + let mut tx = self.db.begin().await?; + let deleted = tx.sequence().next(deleted_at).await?; + let message = tx.messages().delete(message, &deleted).await?; + tx.commit().await?; + + self.events.broadcast( + message + .events() + .filter(Sequence::start_from(deleted.sequence)) + .map(Event::from) + .collect::<Vec<_>>(), + ); + + Ok(()) + } + + pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { + // Somewhat arbitrarily, expire after 90 days. + let expire_at = relative_to.to_owned() - TimeDelta::days(90); + + let mut tx = self.db.begin().await?; + + let expired = tx.messages().expired(&expire_at).await?; + let mut events = Vec::with_capacity(expired.len()); + for message in expired { + let deleted = tx.sequence().next(relative_to).await?; + let message = tx.messages().delete(&message, &deleted).await?; + events.push( + message + .events() + .filter(Sequence::start_from(deleted.sequence)), + ); + } + + tx.commit().await?; + + self.events.broadcast( + events + .into_iter() + .kmerge_by(Sequence::merge) + .map(Event::from) + .collect::<Vec<_>>(), + ); + + Ok(()) + } +} + +#[derive(Debug, thiserror::Error)] +pub enum SendError { + #[error("channel {0} not found")] + ChannelNotFound(channel::Id), + #[error(transparent)] + DatabaseError(#[from] sqlx::Error), +} + +#[derive(Debug, thiserror::Error)] +pub enum DeleteError { + #[error("channel {0} not found")] + ChannelNotFound(channel::Id), + #[error("message {0} not found")] + NotFound(Id), + #[error(transparent)] + DatabaseError(#[from] sqlx::Error), +} |
