summaryrefslogtreecommitdiff
path: root/src/channel/app.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/channel/app.rs')
-rw-r--r--src/channel/app.rs72
1 files changed, 61 insertions, 11 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs
index 6ce826b..24be2ff 100644
--- a/src/channel/app.rs
+++ b/src/channel/app.rs
@@ -2,10 +2,12 @@ use chrono::TimeDelta;
use itertools::Itertools;
use sqlx::sqlite::SqlitePool;
+use super::{repo::Provider as _, Channel, Id};
use crate::{
- channel::{repo::Provider as _, Channel},
clock::DateTime,
- event::{broadcaster::Broadcaster, repo::Provider as _, Sequence},
+ db::NotFound,
+ event::{broadcaster::Broadcaster, repo::Provider as _, Event, Sequence, Sequenced},
+ message::repo::Provider as _,
};
pub struct Channels<'a> {
@@ -28,9 +30,8 @@ impl<'a> Channels<'a> {
.map_err(|err| CreateError::from_duplicate_name(err, name))?;
tx.commit().await?;
- for event in channel.events() {
- self.events.broadcast(event);
- }
+ self.events
+ .broadcast(channel.events().map(Event::from).collect::<Vec<_>>());
Ok(channel.snapshot())
}
@@ -53,6 +54,46 @@ impl<'a> Channels<'a> {
Ok(channels)
}
+ pub async fn delete(&self, channel: &Id, deleted_at: &DateTime) -> Result<(), DeleteError> {
+ let mut tx = self.db.begin().await?;
+
+ let channel = tx
+ .channels()
+ .by_id(channel)
+ .await
+ .not_found(|| DeleteError::NotFound(channel.clone()))?
+ .snapshot();
+
+ let mut events = Vec::new();
+
+ let messages = tx.messages().in_channel(&channel).await?;
+ for message in messages {
+ let deleted = tx.sequence().next(deleted_at).await?;
+ let message = tx.messages().delete(&message, &deleted).await?;
+ events.extend(
+ message
+ .events()
+ .filter(Sequence::start_from(deleted.sequence))
+ .map(Event::from),
+ );
+ }
+
+ let deleted = tx.sequence().next(deleted_at).await?;
+ let channel = tx.channels().delete(&channel.id, &deleted).await?;
+ events.extend(
+ channel
+ .events()
+ .filter(Sequence::start_from(deleted.sequence))
+ .map(Event::from),
+ );
+
+ tx.commit().await?;
+
+ self.events.broadcast(events);
+
+ Ok(())
+ }
+
pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> {
// Somewhat arbitrarily, expire after 90 days.
let expire_at = relative_to.to_owned() - TimeDelta::days(90);
@@ -73,12 +114,13 @@ impl<'a> Channels<'a> {
tx.commit().await?;
- for event in events
- .into_iter()
- .kmerge_by(|a, b| a.instant.sequence < b.instant.sequence)
- {
- self.events.broadcast(event);
- }
+ self.events.broadcast(
+ events
+ .into_iter()
+ .kmerge_by(|a, b| a.sequence() < b.sequence())
+ .map(Event::from)
+ .collect::<Vec<_>>(),
+ );
Ok(())
}
@@ -92,6 +134,14 @@ pub enum CreateError {
DatabaseError(#[from] sqlx::Error),
}
+#[derive(Debug, thiserror::Error)]
+pub enum DeleteError {
+ #[error("channel {0} not found")]
+ NotFound(Id),
+ #[error(transparent)]
+ DatabaseError(#[from] sqlx::Error),
+}
+
impl CreateError {
fn from_duplicate_name(error: sqlx::Error, name: &str) -> Self {
if let Some(error) = error.as_database_error() {