summaryrefslogtreecommitdiff
path: root/src/channel/app.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/channel/app.rs')
-rw-r--r--src/channel/app.rs30
1 files changed, 23 insertions, 7 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs
index 46eaba8..0409076 100644
--- a/src/channel/app.rs
+++ b/src/channel/app.rs
@@ -23,9 +23,9 @@ impl<'a> Channels<'a> {
pub async fn create(&self, name: &str, created_at: &DateTime) -> Result<Channel, CreateError> {
let mut tx = self.db.begin().await?;
let created = tx.sequence().next(created_at).await?;
- let channel = tx
- .channels()
- .create(name, &created)
+ let channel = tx.channels().create(name, &created).await?;
+ tx.channels()
+ .reserve_name(&channel, name)
.await
.duplicate(|| CreateError::DuplicateName(name.into()))?;
tx.commit().await?;
@@ -43,7 +43,7 @@ impl<'a> Channels<'a> {
let channel = tx.channels().by_id(channel).await.optional()?;
tx.commit().await?;
- Ok(channel.iter().flat_map(History::events).collect())
+ Ok(channel.as_ref().and_then(History::as_snapshot))
}
pub async fn delete(&self, channel: &Id, deleted_at: &DateTime) -> Result<(), Error> {
@@ -54,13 +54,16 @@ impl<'a> Channels<'a> {
.by_id(channel)
.await
.not_found(|| Error::NotFound(channel.clone()))?;
+ channel
+ .as_snapshot()
+ .ok_or_else(|| Error::Deleted(channel.id().clone()))?;
let mut events = Vec::new();
- let messages = tx.messages().in_channel(&channel, None).await?;
+ let messages = tx.messages().live(&channel).await?;
for message in messages {
let deleted = tx.sequence().next(deleted_at).await?;
- let message = tx.messages().delete(message.id(), &deleted).await?;
+ let message = tx.messages().delete(&message, &deleted).await?;
events.extend(
message
.events()
@@ -70,7 +73,7 @@ impl<'a> Channels<'a> {
}
let deleted = tx.sequence().next(deleted_at).await?;
- let channel = tx.channels().delete(channel.id(), &deleted).await?;
+ let channel = tx.channels().delete(&channel, &deleted).await?;
events.extend(
channel
.events()
@@ -115,6 +118,17 @@ impl<'a> Channels<'a> {
Ok(())
}
+
+ pub async fn purge(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> {
+ // Somewhat arbitrarily, purge after 7 days.
+ let purge_at = relative_to.to_owned() - TimeDelta::days(7);
+
+ let mut tx = self.db.begin().await?;
+ tx.channels().purge(&purge_at).await?;
+ tx.commit().await?;
+
+ Ok(())
+ }
}
#[derive(Debug, thiserror::Error)]
@@ -129,6 +143,8 @@ pub enum CreateError {
pub enum Error {
#[error("channel {0} not found")]
NotFound(Id),
+ #[error("channel {0} deleted")]
+ Deleted(Id),
#[error(transparent)]
Database(#[from] sqlx::Error),
}