diff options
| author | Kit La Touche <kit@transneptune.net> | 2024-09-28 21:55:50 -0400 |
|---|---|---|
| committer | Kit La Touche <kit@transneptune.net> | 2024-09-28 21:55:50 -0400 |
| commit | 897eef0306917baf3662e691b29f182d35805296 (patch) | |
| tree | 024e2a3fa13ac96e0b4339a6d62ae533efe7db07 /src/channel/app.rs | |
| parent | c524b333befc8cc97aa49f73b3ed28bc3b82420c (diff) | |
| parent | 4d0bb0709b168a24ab6a8dbc86da45d7503596ee (diff) | |
Merge branch 'main' into feature-frontend
Diffstat (limited to 'src/channel/app.rs')
| -rw-r--r-- | src/channel/app.rs | 38 |
1 files changed, 34 insertions, 4 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs index 793fa35..d7312e4 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -1,7 +1,9 @@ +use chrono::TimeDelta; use sqlx::sqlite::SqlitePool; use crate::{ - events::broadcaster::Broadcaster, + clock::DateTime, + events::{broadcaster::Broadcaster, repo::message::Provider as _, types::ChannelEvent}, repo::channel::{Channel, Provider as _}, }; @@ -15,16 +17,18 @@ impl<'a> Channels<'a> { Self { db, broadcaster } } - pub async fn create(&self, name: &str) -> Result<Channel, CreateError> { + pub async fn create(&self, name: &str, created_at: &DateTime) -> Result<Channel, CreateError> { let mut tx = self.db.begin().await?; let channel = tx .channels() - .create(name) + .create(name, created_at) .await .map_err(|err| CreateError::from_duplicate_name(err, name))?; - self.broadcaster.register_channel(&channel.id); tx.commit().await?; + self.broadcaster + .broadcast(&ChannelEvent::created(channel.clone())); + Ok(channel) } @@ -35,6 +39,32 @@ impl<'a> Channels<'a> { Ok(channels) } + + pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { + // Somewhat arbitrarily, expire after 90 days. + let expire_at = relative_to.to_owned() - TimeDelta::days(90); + + let mut tx = self.db.begin().await?; + let expired = tx.channels().expired(&expire_at).await?; + + let mut events = Vec::with_capacity(expired.len()); + for channel in expired { + let sequence = tx.message_events().assign_sequence(&channel).await?; + let event = tx + .channels() + .delete_expired(&channel, sequence, relative_to) + .await?; + events.push(event); + } + + tx.commit().await?; + + for event in events { + self.broadcaster.broadcast(&event); + } + + Ok(()) + } } #[derive(Debug, thiserror::Error)] |
