use chrono::TimeDelta; use itertools::Itertools; use sqlx::sqlite::SqlitePool; use super::{repo::Provider as _, Channel, Id}; use crate::{ clock::DateTime, db::NotFound, event::{broadcaster::Broadcaster, repo::Provider as _, Event, Sequence, Sequenced}, message::repo::Provider as _, }; pub struct Channels<'a> { db: &'a SqlitePool, events: &'a Broadcaster, } impl<'a> Channels<'a> { pub const fn new(db: &'a SqlitePool, events: &'a Broadcaster) -> Self { Self { db, events } } pub async fn create(&self, name: &str, created_at: &DateTime) -> Result { let mut tx = self.db.begin().await?; let created = tx.sequence().next(created_at).await?; let channel = tx .channels() .create(name, &created) .await .map_err(|err| CreateError::from_duplicate_name(err, name))?; tx.commit().await?; self.events .broadcast(channel.events().map(Event::from).collect::>()); Ok(channel.snapshot()) } pub async fn all(&self, resume_point: Option) -> Result, InternalError> { let mut tx = self.db.begin().await?; let channels = tx.channels().all(resume_point).await?; tx.commit().await?; let channels = channels .into_iter() .filter_map(|channel| { channel .events() .filter(Sequence::up_to(resume_point)) .collect() }) .collect(); Ok(channels) } pub async fn delete(&self, channel: &Id, deleted_at: &DateTime) -> Result<(), DeleteError> { let mut tx = self.db.begin().await?; let channel = tx .channels() .by_id(channel) .await .not_found(|| DeleteError::NotFound(channel.clone()))? .snapshot(); let mut events = Vec::new(); let messages = tx.messages().in_channel(&channel).await?; for message in messages { let deleted = tx.sequence().next(deleted_at).await?; let message = tx.messages().delete(&message, &deleted).await?; events.extend( message .events() .filter(Sequence::start_from(deleted.sequence)) .map(Event::from), ); } let deleted = tx.sequence().next(deleted_at).await?; let channel = tx.channels().delete(&channel.id, &deleted).await?; events.extend( channel .events() .filter(Sequence::start_from(deleted.sequence)) .map(Event::from), ); tx.commit().await?; self.events.broadcast(events); Ok(()) } pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { // Somewhat arbitrarily, expire after 90 days. let expire_at = relative_to.to_owned() - TimeDelta::days(90); let mut tx = self.db.begin().await?; let expired = tx.channels().expired(&expire_at).await?; let mut events = Vec::with_capacity(expired.len()); for channel in expired { let deleted = tx.sequence().next(relative_to).await?; let channel = tx.channels().delete(&channel, &deleted).await?; events.push( channel .events() .filter(Sequence::start_from(deleted.sequence)), ); } tx.commit().await?; self.events.broadcast( events .into_iter() .kmerge_by(|a, b| a.sequence() < b.sequence()) .map(Event::from) .collect::>(), ); Ok(()) } } #[derive(Debug, thiserror::Error)] pub enum CreateError { #[error("channel named {0} already exists")] DuplicateName(String), #[error(transparent)] DatabaseError(#[from] sqlx::Error), } #[derive(Debug, thiserror::Error)] pub enum DeleteError { #[error("channel {0} not found")] NotFound(Id), #[error(transparent)] DatabaseError(#[from] sqlx::Error), } impl CreateError { fn from_duplicate_name(error: sqlx::Error, name: &str) -> Self { if let Some(error) = error.as_database_error() { if error.is_unique_violation() { return Self::DuplicateName(name.into()); } } Self::from(error) } } #[derive(Debug, thiserror::Error)] pub enum InternalError { #[error(transparent)] DatabaseError(#[from] sqlx::Error), }