diff options
Diffstat (limited to 'src/channel/app.rs')
| -rw-r--r-- | src/channel/app.rs | 224 |
1 files changed, 0 insertions, 224 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs deleted file mode 100644 index e3b169c..0000000 --- a/src/channel/app.rs +++ /dev/null @@ -1,224 +0,0 @@ -use chrono::TimeDelta; -use itertools::Itertools; -use sqlx::sqlite::SqlitePool; - -use super::{ - Channel, Id, - repo::{LoadError, Provider as _}, - validate, -}; -use crate::{ - clock::DateTime, - db::{Duplicate as _, NotFound as _}, - event::{Broadcaster, Event, Sequence, repo::Provider as _}, - message::{self, repo::Provider as _}, - name::{self, Name}, -}; - -pub struct Channels<'a> { - db: &'a SqlitePool, - events: &'a Broadcaster, -} - -impl<'a> Channels<'a> { - pub const fn new(db: &'a SqlitePool, events: &'a Broadcaster) -> Self { - Self { db, events } - } - - pub async fn create(&self, name: &Name, created_at: &DateTime) -> Result<Channel, CreateError> { - if !validate::name(name) { - return Err(CreateError::InvalidName(name.clone())); - } - - let mut tx = self.db.begin().await?; - let created = tx.sequence().next(created_at).await?; - let channel = tx - .channels() - .create(name, &created) - .await - .duplicate(|| CreateError::DuplicateName(name.clone()))?; - tx.commit().await?; - - self.events - .broadcast(channel.events().map(Event::from).collect::<Vec<_>>()); - - Ok(channel.as_created()) - } - - // This function is careless with respect to time, and gets you the channel as - // it exists in the specific moment when you call it. - pub async fn get(&self, channel: &Id) -> Result<Channel, Error> { - let to_not_found = || Error::NotFound(channel.clone()); - let to_deleted = || Error::Deleted(channel.clone()); - - let mut tx = self.db.begin().await?; - let channel = tx.channels().by_id(channel).await.not_found(to_not_found)?; - tx.commit().await?; - - channel.as_snapshot().ok_or_else(to_deleted) - } - - pub async fn delete(&self, channel: &Id, deleted_at: &DateTime) -> Result<(), DeleteError> { - let mut tx = self.db.begin().await?; - - let channel = tx - .channels() - .by_id(channel) - .await - .not_found(|| DeleteError::NotFound(channel.clone()))?; - channel - .as_snapshot() - .ok_or_else(|| DeleteError::Deleted(channel.id().clone()))?; - - let mut events = Vec::new(); - - let messages = tx.messages().live(&channel).await?; - let has_messages = messages - .iter() - .map(message::History::as_snapshot) - .any(|message| message.is_some()); - if has_messages { - return Err(DeleteError::NotEmpty(channel.id().clone())); - } - - let deleted = tx.sequence().next(deleted_at).await?; - let channel = tx.channels().delete(&channel, &deleted).await?; - events.extend( - channel - .events() - .filter(Sequence::start_from(deleted.sequence)) - .map(Event::from), - ); - - tx.commit().await?; - - self.events.broadcast(events); - - Ok(()) - } - - pub async fn expire(&self, relative_to: &DateTime) -> Result<(), ExpireError> { - // Somewhat arbitrarily, expire after 7 days. Active channels will not be - // expired until their messages expire. - let expire_at = relative_to.to_owned() - TimeDelta::days(7); - - let mut tx = self.db.begin().await?; - let expired = tx.channels().expired(&expire_at).await?; - - let mut events = Vec::with_capacity(expired.len()); - for channel in expired { - let deleted = tx.sequence().next(relative_to).await?; - let channel = tx.channels().delete(&channel, &deleted).await?; - events.push( - channel - .events() - .filter(Sequence::start_from(deleted.sequence)), - ); - } - - tx.commit().await?; - - self.events.broadcast( - events - .into_iter() - .kmerge_by(Sequence::merge) - .map(Event::from) - .collect::<Vec<_>>(), - ); - - Ok(()) - } - - pub async fn purge(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { - // Somewhat arbitrarily, purge after 6 hours. - let purge_at = relative_to.to_owned() - TimeDelta::hours(6); - - let mut tx = self.db.begin().await?; - tx.channels().purge(&purge_at).await?; - tx.commit().await?; - - Ok(()) - } -} - -#[derive(Debug, thiserror::Error)] -pub enum CreateError { - #[error("channel named {0} already exists")] - DuplicateName(Name), - #[error("invalid channel name: {0}")] - InvalidName(Name), - #[error(transparent)] - Database(#[from] sqlx::Error), - #[error(transparent)] - Name(#[from] name::Error), -} - -impl From<LoadError> for CreateError { - fn from(error: LoadError) -> Self { - match error { - LoadError::Database(error) => error.into(), - LoadError::Name(error) => error.into(), - } - } -} - -#[derive(Debug, thiserror::Error)] -pub enum Error { - #[error("channel {0} not found")] - NotFound(Id), - #[error("channel {0} deleted")] - Deleted(Id), - #[error(transparent)] - Database(#[from] sqlx::Error), - #[error(transparent)] - Name(#[from] name::Error), -} - -impl From<LoadError> for Error { - fn from(error: LoadError) -> Self { - match error { - LoadError::Database(error) => error.into(), - LoadError::Name(error) => error.into(), - } - } -} - -#[derive(Debug, thiserror::Error)] -pub enum DeleteError { - #[error("channel {0} not found")] - NotFound(Id), - #[error("channel {0} deleted")] - Deleted(Id), - #[error("channel {0} not empty")] - NotEmpty(Id), - #[error(transparent)] - Database(#[from] sqlx::Error), - #[error(transparent)] - Name(#[from] name::Error), -} - -impl From<LoadError> for DeleteError { - fn from(error: LoadError) -> Self { - match error { - LoadError::Database(error) => error.into(), - LoadError::Name(error) => error.into(), - } - } -} - -#[derive(Debug, thiserror::Error)] -pub enum ExpireError { - #[error(transparent)] - Database(#[from] sqlx::Error), - #[error(transparent)] - Name(#[from] name::Error), -} - -impl From<LoadError> for ExpireError { - fn from(error: LoadError) -> Self { - match error { - LoadError::Database(error) => error.into(), - LoadError::Name(error) => error.into(), - } - } -} |
