diff options
Diffstat (limited to 'src/events/app.rs')
| -rw-r--r-- | src/events/app.rs | 140 |
1 files changed, 0 insertions, 140 deletions
diff --git a/src/events/app.rs b/src/events/app.rs deleted file mode 100644 index 1fa2f70..0000000 --- a/src/events/app.rs +++ /dev/null @@ -1,140 +0,0 @@ -use chrono::TimeDelta; -use futures::{ - future, - stream::{self, StreamExt as _}, - Stream, -}; -use sqlx::sqlite::SqlitePool; - -use super::{ - broadcaster::Broadcaster, - repo::message::Provider as _, - types::{self, ChannelEvent}, -}; -use crate::{ - channel, - clock::DateTime, - repo::{ - channel::Provider as _, - error::NotFound as _, - login::Login, - sequence::{Provider as _, Sequence}, - }, -}; - -pub struct Events<'a> { - db: &'a SqlitePool, - events: &'a Broadcaster, -} - -impl<'a> Events<'a> { - pub const fn new(db: &'a SqlitePool, events: &'a Broadcaster) -> Self { - Self { db, events } - } - - pub async fn send( - &self, - login: &Login, - channel: &channel::Id, - body: &str, - sent_at: &DateTime, - ) -> Result<types::ChannelEvent, EventsError> { - let mut tx = self.db.begin().await?; - let channel = tx - .channels() - .by_id(channel) - .await - .not_found(|| EventsError::ChannelNotFound(channel.clone()))?; - let sent_sequence = tx.sequence().next().await?; - let event = tx - .message_events() - .create(login, &channel, sent_at, sent_sequence, body) - .await?; - tx.commit().await?; - - self.events.broadcast(&event); - Ok(event) - } - - pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { - // Somewhat arbitrarily, expire after 90 days. - let expire_at = relative_to.to_owned() - TimeDelta::days(90); - - let mut tx = self.db.begin().await?; - let expired = tx.message_events().expired(&expire_at).await?; - - let mut events = Vec::with_capacity(expired.len()); - for (channel, message) in expired { - let deleted_sequence = tx.sequence().next().await?; - let event = tx - .message_events() - .delete(&channel, &message, relative_to, deleted_sequence) - .await?; - events.push(event); - } - - tx.commit().await?; - - for event in events { - self.events.broadcast(&event); - } - - Ok(()) - } - - pub async fn subscribe( - &self, - resume_at: Option<Sequence>, - ) -> Result<impl Stream<Item = types::ChannelEvent> + std::fmt::Debug, sqlx::Error> { - // Subscribe before retrieving, to catch messages broadcast while we're - // querying the DB. We'll prune out duplicates later. - let live_messages = self.events.subscribe(); - - let mut tx = self.db.begin().await?; - let channels = tx.channels().replay(resume_at).await?; - - let channel_events = channels - .into_iter() - .map(ChannelEvent::created) - .filter(move |event| resume_at.map_or(true, |resume_at| event.sequence > resume_at)); - - let message_events = tx.message_events().replay(resume_at).await?; - - let mut replay_events = channel_events - .into_iter() - .chain(message_events.into_iter()) - .collect::<Vec<_>>(); - replay_events.sort_by_key(|event| event.sequence); - let resume_live_at = replay_events.last().map(|event| event.sequence); - - let replay = stream::iter(replay_events); - - // no skip_expired or resume transforms for stored_messages, as it's - // constructed not to contain messages meeting either criterion. - // - // * skip_expired is redundant with the `tx.broadcasts().expire(…)` call; - // * resume is redundant with the resume_at argument to - // `tx.broadcasts().replay(…)`. - let live_messages = live_messages - // Filtering on the broadcast resume point filters out messages - // before resume_at, and filters out messages duplicated from - // stored_messages. - .filter(Self::resume(resume_live_at)); - - Ok(replay.chain(live_messages)) - } - - fn resume( - resume_at: Option<Sequence>, - ) -> impl for<'m> FnMut(&'m types::ChannelEvent) -> future::Ready<bool> { - move |event| future::ready(resume_at < Some(event.sequence)) - } -} - -#[derive(Debug, thiserror::Error)] -pub enum EventsError { - #[error("channel {0} not found")] - ChannelNotFound(channel::Id), - #[error(transparent)] - DatabaseError(#[from] sqlx::Error), -} |
