diff options
Diffstat (limited to 'src/channel')
| -rw-r--r-- | src/channel/app.rs | 29 | ||||
| -rw-r--r-- | src/channel/routes/test/on_create.rs | 10 |
2 files changed, 35 insertions, 4 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs index 1eeca79..d7312e4 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -1,8 +1,9 @@ +use chrono::TimeDelta; use sqlx::sqlite::SqlitePool; use crate::{ clock::DateTime, - events::{broadcaster::Broadcaster, types::ChannelEvent}, + events::{broadcaster::Broadcaster, repo::message::Provider as _, types::ChannelEvent}, repo::channel::{Channel, Provider as _}, }; @@ -38,6 +39,32 @@ impl<'a> Channels<'a> { Ok(channels) } + + pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { + // Somewhat arbitrarily, expire after 90 days. + let expire_at = relative_to.to_owned() - TimeDelta::days(90); + + let mut tx = self.db.begin().await?; + let expired = tx.channels().expired(&expire_at).await?; + + let mut events = Vec::with_capacity(expired.len()); + for channel in expired { + let sequence = tx.message_events().assign_sequence(&channel).await?; + let event = tx + .channels() + .delete_expired(&channel, sequence, relative_to) + .await?; + events.push(event); + } + + tx.commit().await?; + + for event in events { + self.broadcaster.broadcast(&event); + } + + Ok(()) + } } #[derive(Debug, thiserror::Error)] diff --git a/src/channel/routes/test/on_create.rs b/src/channel/routes/test/on_create.rs index 5e62d7f..e2610a5 100644 --- a/src/channel/routes/test/on_create.rs +++ b/src/channel/routes/test/on_create.rs @@ -1,5 +1,5 @@ use axum::extract::{Json, State}; -use futures::{future, stream::StreamExt as _}; +use futures::stream::StreamExt as _; use crate::{ channel::{app, routes}, @@ -41,7 +41,7 @@ async fn new_channel() { .subscribe(types::ResumePoint::default()) .await .expect("subscribing never fails") - .filter(|types::ResumableEvent(_, event)| future::ready(event.channel == response_channel)); + .filter(fixtures::filter::created()); let types::ResumableEvent(_, event) = events .next() @@ -50,7 +50,11 @@ async fn new_channel() { .expect("creation event published"); assert_eq!(types::Sequence::default(), event.sequence); - assert_eq!(types::ChannelEventData::Created, event.data); + assert!(matches!( + event.data, + types::ChannelEventData::Created(event) + if event.channel == response_channel + )); } #[tokio::test] |
