diff options
Diffstat (limited to 'src/channel/app.rs')
| -rw-r--r-- | src/channel/app.rs | 89 |
1 files changed, 75 insertions, 14 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs index 5d6cada..7bfa0f7 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -2,12 +2,16 @@ use chrono::TimeDelta; use itertools::Itertools; use sqlx::sqlite::SqlitePool; -use super::{repo::Provider as _, Channel, History, Id}; +use super::{ + repo::{LoadError, Provider as _}, + Channel, History, Id, +}; use crate::{ clock::DateTime, db::{Duplicate as _, NotFound as _}, event::{repo::Provider as _, Broadcaster, Event, Sequence}, message::repo::Provider as _, + name::{self, Name}, }; pub struct Channels<'a> { @@ -20,14 +24,14 @@ impl<'a> Channels<'a> { Self { db, events } } - pub async fn create(&self, name: &str, created_at: &DateTime) -> Result<Channel, CreateError> { + pub async fn create(&self, name: &Name, created_at: &DateTime) -> Result<Channel, CreateError> { let mut tx = self.db.begin().await?; let created = tx.sequence().next(created_at).await?; let channel = tx .channels() .create(name, &created) .await - .duplicate(|| CreateError::DuplicateName(name.into()))?; + .duplicate(|| CreateError::DuplicateName(name.clone()))?; tx.commit().await?; self.events @@ -38,12 +42,12 @@ impl<'a> Channels<'a> { // This function is careless with respect to time, and gets you the channel as // it exists in the specific moment when you call it. - pub async fn get(&self, channel: &Id) -> Result<Option<Channel>, sqlx::Error> { + pub async fn get(&self, channel: &Id) -> Result<Option<Channel>, Error> { let mut tx = self.db.begin().await?; let channel = tx.channels().by_id(channel).await.optional()?; tx.commit().await?; - Ok(channel.iter().flat_map(History::events).collect()) + Ok(channel.as_ref().and_then(History::as_snapshot)) } pub async fn delete(&self, channel: &Id, deleted_at: &DateTime) -> Result<(), Error> { @@ -54,13 +58,16 @@ impl<'a> Channels<'a> { .by_id(channel) .await .not_found(|| Error::NotFound(channel.clone()))?; + channel + .as_snapshot() + .ok_or_else(|| Error::Deleted(channel.id().clone()))?; let mut events = Vec::new(); - let messages = tx.messages().in_channel(&channel, None).await?; + let messages = tx.messages().live(&channel).await?; for message in messages { let deleted = tx.sequence().next(deleted_at).await?; - let message = tx.messages().delete(message.id(), &deleted).await?; + let message = tx.messages().delete(&message, &deleted).await?; events.extend( message .events() @@ -70,7 +77,7 @@ impl<'a> Channels<'a> { } let deleted = tx.sequence().next(deleted_at).await?; - let channel = tx.channels().delete(channel.id(), &deleted).await?; + let channel = tx.channels().delete(&channel, &deleted).await?; events.extend( channel .events() @@ -85,7 +92,7 @@ impl<'a> Channels<'a> { Ok(()) } - pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { + pub async fn expire(&self, relative_to: &DateTime) -> Result<(), ExpireError> { // Somewhat arbitrarily, expire after 90 days. let expire_at = relative_to.to_owned() - TimeDelta::days(90); @@ -115,26 +122,80 @@ impl<'a> Channels<'a> { Ok(()) } + + pub async fn purge(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { + // Somewhat arbitrarily, purge after 7 days. + let purge_at = relative_to.to_owned() - TimeDelta::days(7); + + let mut tx = self.db.begin().await?; + tx.channels().purge(&purge_at).await?; + tx.commit().await?; + + Ok(()) + } + + pub async fn recanonicalize(&self) -> Result<(), sqlx::Error> { + let mut tx = self.db.begin().await?; + tx.channels().recanonicalize().await?; + tx.commit().await?; + + Ok(()) + } } #[derive(Debug, thiserror::Error)] pub enum CreateError { #[error("channel named {0} already exists")] - DuplicateName(String), + DuplicateName(Name), + #[error(transparent)] + Database(#[from] sqlx::Error), #[error(transparent)] - DatabaseError(#[from] sqlx::Error), + Name(#[from] name::Error), +} + +impl From<LoadError> for CreateError { + fn from(error: LoadError) -> Self { + match error { + LoadError::Database(error) => error.into(), + LoadError::Name(error) => error.into(), + } + } } #[derive(Debug, thiserror::Error)] pub enum Error { #[error("channel {0} not found")] NotFound(Id), + #[error("channel {0} deleted")] + Deleted(Id), #[error(transparent)] - DatabaseError(#[from] sqlx::Error), + Database(#[from] sqlx::Error), + #[error(transparent)] + Name(#[from] name::Error), +} + +impl From<LoadError> for Error { + fn from(error: LoadError) -> Self { + match error { + LoadError::Database(error) => error.into(), + LoadError::Name(error) => error.into(), + } + } } #[derive(Debug, thiserror::Error)] -pub enum InternalError { +pub enum ExpireError { #[error(transparent)] - DatabaseError(#[from] sqlx::Error), + Database(#[from] sqlx::Error), + #[error(transparent)] + Name(#[from] name::Error), +} + +impl From<LoadError> for ExpireError { + fn from(error: LoadError) -> Self { + match error { + LoadError::Database(error) => error.into(), + LoadError::Name(error) => error.into(), + } + } } |
