summaryrefslogtreecommitdiff
path: root/src/channel/app.rs
diff options
context:
space:
mode:
authorOwen Jacobson <owen@grimoire.ca>2024-10-03 20:17:07 -0400
committerOwen Jacobson <owen@grimoire.ca>2024-10-03 20:17:07 -0400
commit0a5599c60d20ccc2223779eeba5dc91a95ea0fe5 (patch)
treef7ce69ad18768ff53d8fa37d8eb9c6c575633f9e /src/channel/app.rs
parentec804134c33aedb001c426c5f42f43f53c47848f (diff)
Add endpoints for deleting channels and messages.
It is deliberate that the expire() functions do not use them. To avoid races, the transactions must be committed before events get sent, in both cases, which makes them structurally pretty different.
Diffstat (limited to 'src/channel/app.rs')
-rw-r--r--src/channel/app.rs72
1 files changed, 61 insertions, 11 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs
index 6ce826b..24be2ff 100644
--- a/src/channel/app.rs
+++ b/src/channel/app.rs
@@ -2,10 +2,12 @@ use chrono::TimeDelta;
use itertools::Itertools;
use sqlx::sqlite::SqlitePool;
+use super::{repo::Provider as _, Channel, Id};
use crate::{
- channel::{repo::Provider as _, Channel},
clock::DateTime,
- event::{broadcaster::Broadcaster, repo::Provider as _, Sequence},
+ db::NotFound,
+ event::{broadcaster::Broadcaster, repo::Provider as _, Event, Sequence, Sequenced},
+ message::repo::Provider as _,
};
pub struct Channels<'a> {
@@ -28,9 +30,8 @@ impl<'a> Channels<'a> {
.map_err(|err| CreateError::from_duplicate_name(err, name))?;
tx.commit().await?;
- for event in channel.events() {
- self.events.broadcast(event);
- }
+ self.events
+ .broadcast(channel.events().map(Event::from).collect::<Vec<_>>());
Ok(channel.snapshot())
}
@@ -53,6 +54,46 @@ impl<'a> Channels<'a> {
Ok(channels)
}
+ pub async fn delete(&self, channel: &Id, deleted_at: &DateTime) -> Result<(), DeleteError> {
+ let mut tx = self.db.begin().await?;
+
+ let channel = tx
+ .channels()
+ .by_id(channel)
+ .await
+ .not_found(|| DeleteError::NotFound(channel.clone()))?
+ .snapshot();
+
+ let mut events = Vec::new();
+
+ let messages = tx.messages().in_channel(&channel).await?;
+ for message in messages {
+ let deleted = tx.sequence().next(deleted_at).await?;
+ let message = tx.messages().delete(&message, &deleted).await?;
+ events.extend(
+ message
+ .events()
+ .filter(Sequence::start_from(deleted.sequence))
+ .map(Event::from),
+ );
+ }
+
+ let deleted = tx.sequence().next(deleted_at).await?;
+ let channel = tx.channels().delete(&channel.id, &deleted).await?;
+ events.extend(
+ channel
+ .events()
+ .filter(Sequence::start_from(deleted.sequence))
+ .map(Event::from),
+ );
+
+ tx.commit().await?;
+
+ self.events.broadcast(events);
+
+ Ok(())
+ }
+
pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> {
// Somewhat arbitrarily, expire after 90 days.
let expire_at = relative_to.to_owned() - TimeDelta::days(90);
@@ -73,12 +114,13 @@ impl<'a> Channels<'a> {
tx.commit().await?;
- for event in events
- .into_iter()
- .kmerge_by(|a, b| a.instant.sequence < b.instant.sequence)
- {
- self.events.broadcast(event);
- }
+ self.events.broadcast(
+ events
+ .into_iter()
+ .kmerge_by(|a, b| a.sequence() < b.sequence())
+ .map(Event::from)
+ .collect::<Vec<_>>(),
+ );
Ok(())
}
@@ -92,6 +134,14 @@ pub enum CreateError {
DatabaseError(#[from] sqlx::Error),
}
+#[derive(Debug, thiserror::Error)]
+pub enum DeleteError {
+ #[error("channel {0} not found")]
+ NotFound(Id),
+ #[error(transparent)]
+ DatabaseError(#[from] sqlx::Error),
+}
+
impl CreateError {
fn from_duplicate_name(error: sqlx::Error, name: &str) -> Self {
if let Some(error) = error.as_database_error() {