summaryrefslogtreecommitdiff
path: root/src/channel/app.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/channel/app.rs')
-rw-r--r--src/channel/app.rs224
1 files changed, 0 insertions, 224 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs
deleted file mode 100644
index e3b169c..0000000
--- a/src/channel/app.rs
+++ /dev/null
@@ -1,224 +0,0 @@
-use chrono::TimeDelta;
-use itertools::Itertools;
-use sqlx::sqlite::SqlitePool;
-
-use super::{
- Channel, Id,
- repo::{LoadError, Provider as _},
- validate,
-};
-use crate::{
- clock::DateTime,
- db::{Duplicate as _, NotFound as _},
- event::{Broadcaster, Event, Sequence, repo::Provider as _},
- message::{self, repo::Provider as _},
- name::{self, Name},
-};
-
-pub struct Channels<'a> {
- db: &'a SqlitePool,
- events: &'a Broadcaster,
-}
-
-impl<'a> Channels<'a> {
- pub const fn new(db: &'a SqlitePool, events: &'a Broadcaster) -> Self {
- Self { db, events }
- }
-
- pub async fn create(&self, name: &Name, created_at: &DateTime) -> Result<Channel, CreateError> {
- if !validate::name(name) {
- return Err(CreateError::InvalidName(name.clone()));
- }
-
- let mut tx = self.db.begin().await?;
- let created = tx.sequence().next(created_at).await?;
- let channel = tx
- .channels()
- .create(name, &created)
- .await
- .duplicate(|| CreateError::DuplicateName(name.clone()))?;
- tx.commit().await?;
-
- self.events
- .broadcast(channel.events().map(Event::from).collect::<Vec<_>>());
-
- Ok(channel.as_created())
- }
-
- // This function is careless with respect to time, and gets you the channel as
- // it exists in the specific moment when you call it.
- pub async fn get(&self, channel: &Id) -> Result<Channel, Error> {
- let to_not_found = || Error::NotFound(channel.clone());
- let to_deleted = || Error::Deleted(channel.clone());
-
- let mut tx = self.db.begin().await?;
- let channel = tx.channels().by_id(channel).await.not_found(to_not_found)?;
- tx.commit().await?;
-
- channel.as_snapshot().ok_or_else(to_deleted)
- }
-
- pub async fn delete(&self, channel: &Id, deleted_at: &DateTime) -> Result<(), DeleteError> {
- let mut tx = self.db.begin().await?;
-
- let channel = tx
- .channels()
- .by_id(channel)
- .await
- .not_found(|| DeleteError::NotFound(channel.clone()))?;
- channel
- .as_snapshot()
- .ok_or_else(|| DeleteError::Deleted(channel.id().clone()))?;
-
- let mut events = Vec::new();
-
- let messages = tx.messages().live(&channel).await?;
- let has_messages = messages
- .iter()
- .map(message::History::as_snapshot)
- .any(|message| message.is_some());
- if has_messages {
- return Err(DeleteError::NotEmpty(channel.id().clone()));
- }
-
- let deleted = tx.sequence().next(deleted_at).await?;
- let channel = tx.channels().delete(&channel, &deleted).await?;
- events.extend(
- channel
- .events()
- .filter(Sequence::start_from(deleted.sequence))
- .map(Event::from),
- );
-
- tx.commit().await?;
-
- self.events.broadcast(events);
-
- Ok(())
- }
-
- pub async fn expire(&self, relative_to: &DateTime) -> Result<(), ExpireError> {
- // Somewhat arbitrarily, expire after 7 days. Active channels will not be
- // expired until their messages expire.
- let expire_at = relative_to.to_owned() - TimeDelta::days(7);
-
- let mut tx = self.db.begin().await?;
- let expired = tx.channels().expired(&expire_at).await?;
-
- let mut events = Vec::with_capacity(expired.len());
- for channel in expired {
- let deleted = tx.sequence().next(relative_to).await?;
- let channel = tx.channels().delete(&channel, &deleted).await?;
- events.push(
- channel
- .events()
- .filter(Sequence::start_from(deleted.sequence)),
- );
- }
-
- tx.commit().await?;
-
- self.events.broadcast(
- events
- .into_iter()
- .kmerge_by(Sequence::merge)
- .map(Event::from)
- .collect::<Vec<_>>(),
- );
-
- Ok(())
- }
-
- pub async fn purge(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> {
- // Somewhat arbitrarily, purge after 6 hours.
- let purge_at = relative_to.to_owned() - TimeDelta::hours(6);
-
- let mut tx = self.db.begin().await?;
- tx.channels().purge(&purge_at).await?;
- tx.commit().await?;
-
- Ok(())
- }
-}
-
-#[derive(Debug, thiserror::Error)]
-pub enum CreateError {
- #[error("channel named {0} already exists")]
- DuplicateName(Name),
- #[error("invalid channel name: {0}")]
- InvalidName(Name),
- #[error(transparent)]
- Database(#[from] sqlx::Error),
- #[error(transparent)]
- Name(#[from] name::Error),
-}
-
-impl From<LoadError> for CreateError {
- fn from(error: LoadError) -> Self {
- match error {
- LoadError::Database(error) => error.into(),
- LoadError::Name(error) => error.into(),
- }
- }
-}
-
-#[derive(Debug, thiserror::Error)]
-pub enum Error {
- #[error("channel {0} not found")]
- NotFound(Id),
- #[error("channel {0} deleted")]
- Deleted(Id),
- #[error(transparent)]
- Database(#[from] sqlx::Error),
- #[error(transparent)]
- Name(#[from] name::Error),
-}
-
-impl From<LoadError> for Error {
- fn from(error: LoadError) -> Self {
- match error {
- LoadError::Database(error) => error.into(),
- LoadError::Name(error) => error.into(),
- }
- }
-}
-
-#[derive(Debug, thiserror::Error)]
-pub enum DeleteError {
- #[error("channel {0} not found")]
- NotFound(Id),
- #[error("channel {0} deleted")]
- Deleted(Id),
- #[error("channel {0} not empty")]
- NotEmpty(Id),
- #[error(transparent)]
- Database(#[from] sqlx::Error),
- #[error(transparent)]
- Name(#[from] name::Error),
-}
-
-impl From<LoadError> for DeleteError {
- fn from(error: LoadError) -> Self {
- match error {
- LoadError::Database(error) => error.into(),
- LoadError::Name(error) => error.into(),
- }
- }
-}
-
-#[derive(Debug, thiserror::Error)]
-pub enum ExpireError {
- #[error(transparent)]
- Database(#[from] sqlx::Error),
- #[error(transparent)]
- Name(#[from] name::Error),
-}
-
-impl From<LoadError> for ExpireError {
- fn from(error: LoadError) -> Self {
- match error {
- LoadError::Database(error) => error.into(),
- LoadError::Name(error) => error.into(),
- }
- }
-}