summaryrefslogtreecommitdiff
path: root/src/channel
diff options
context:
space:
mode:
Diffstat (limited to 'src/channel')
-rw-r--r--src/channel/app.rs29
-rw-r--r--src/channel/routes/test/on_create.rs10
2 files changed, 35 insertions, 4 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs
index 1eeca79..d7312e4 100644
--- a/src/channel/app.rs
+++ b/src/channel/app.rs
@@ -1,8 +1,9 @@
+use chrono::TimeDelta;
use sqlx::sqlite::SqlitePool;
use crate::{
clock::DateTime,
- events::{broadcaster::Broadcaster, types::ChannelEvent},
+ events::{broadcaster::Broadcaster, repo::message::Provider as _, types::ChannelEvent},
repo::channel::{Channel, Provider as _},
};
@@ -38,6 +39,32 @@ impl<'a> Channels<'a> {
Ok(channels)
}
+
+ pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> {
+ // Somewhat arbitrarily, expire after 90 days.
+ let expire_at = relative_to.to_owned() - TimeDelta::days(90);
+
+ let mut tx = self.db.begin().await?;
+ let expired = tx.channels().expired(&expire_at).await?;
+
+ let mut events = Vec::with_capacity(expired.len());
+ for channel in expired {
+ let sequence = tx.message_events().assign_sequence(&channel).await?;
+ let event = tx
+ .channels()
+ .delete_expired(&channel, sequence, relative_to)
+ .await?;
+ events.push(event);
+ }
+
+ tx.commit().await?;
+
+ for event in events {
+ self.broadcaster.broadcast(&event);
+ }
+
+ Ok(())
+ }
}
#[derive(Debug, thiserror::Error)]
diff --git a/src/channel/routes/test/on_create.rs b/src/channel/routes/test/on_create.rs
index 5e62d7f..e2610a5 100644
--- a/src/channel/routes/test/on_create.rs
+++ b/src/channel/routes/test/on_create.rs
@@ -1,5 +1,5 @@
use axum::extract::{Json, State};
-use futures::{future, stream::StreamExt as _};
+use futures::stream::StreamExt as _;
use crate::{
channel::{app, routes},
@@ -41,7 +41,7 @@ async fn new_channel() {
.subscribe(types::ResumePoint::default())
.await
.expect("subscribing never fails")
- .filter(|types::ResumableEvent(_, event)| future::ready(event.channel == response_channel));
+ .filter(fixtures::filter::created());
let types::ResumableEvent(_, event) = events
.next()
@@ -50,7 +50,11 @@ async fn new_channel() {
.expect("creation event published");
assert_eq!(types::Sequence::default(), event.sequence);
- assert_eq!(types::ChannelEventData::Created, event.data);
+ assert!(matches!(
+ event.data,
+ types::ChannelEventData::Created(event)
+ if event.channel == response_channel
+ ));
}
#[tokio::test]