diff options
Diffstat (limited to 'src/events/app.rs')
| -rw-r--r-- | src/events/app.rs | 163 |
1 files changed, 0 insertions, 163 deletions
diff --git a/src/events/app.rs b/src/events/app.rs deleted file mode 100644 index db7f430..0000000 --- a/src/events/app.rs +++ /dev/null @@ -1,163 +0,0 @@ -use std::collections::BTreeMap; - -use chrono::TimeDelta; -use futures::{ - future, - stream::{self, StreamExt as _}, - Stream, -}; -use sqlx::sqlite::SqlitePool; - -use super::{ - broadcaster::Broadcaster, - repo::message::Provider as _, - types::{self, ChannelEvent, ResumePoint}, -}; -use crate::{ - clock::DateTime, - repo::{ - channel::{self, Provider as _}, - error::NotFound as _, - login::Login, - }, -}; - -pub struct Events<'a> { - db: &'a SqlitePool, - events: &'a Broadcaster, -} - -impl<'a> Events<'a> { - pub const fn new(db: &'a SqlitePool, events: &'a Broadcaster) -> Self { - Self { db, events } - } - - pub async fn send( - &self, - login: &Login, - channel: &channel::Id, - body: &str, - sent_at: &DateTime, - ) -> Result<types::ChannelEvent, EventsError> { - let mut tx = self.db.begin().await?; - let channel = tx - .channels() - .by_id(channel) - .await - .not_found(|| EventsError::ChannelNotFound(channel.clone()))?; - let event = tx - .message_events() - .create(login, &channel, body, sent_at) - .await?; - tx.commit().await?; - - self.events.broadcast(&event); - Ok(event) - } - - pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { - // Somewhat arbitrarily, expire after 90 days. - let expire_at = relative_to.to_owned() - TimeDelta::days(90); - - let mut tx = self.db.begin().await?; - let expired = tx.message_events().expired(&expire_at).await?; - - let mut events = Vec::with_capacity(expired.len()); - for (channel, message) in expired { - let sequence = tx.message_events().assign_sequence(&channel).await?; - let event = tx - .message_events() - .delete_expired(&channel, &message, sequence, relative_to) - .await?; - events.push(event); - } - - tx.commit().await?; - - for event in events { - self.events.broadcast(&event); - } - - Ok(()) - } - - pub async fn subscribe( - &self, - resume_at: ResumePoint, - ) -> Result<impl Stream<Item = types::ResumableEvent> + std::fmt::Debug, sqlx::Error> { - let mut tx = self.db.begin().await?; - let channels = tx.channels().all().await?; - - let created_events = { - let resume_at = resume_at.clone(); - let channels = channels.clone(); - stream::iter( - channels - .into_iter() - .map(ChannelEvent::created) - .filter(move |event| resume_at.not_after(event)), - ) - }; - - // Subscribe before retrieving, to catch messages broadcast while we're - // querying the DB. We'll prune out duplicates later. - let live_messages = self.events.subscribe(); - - let mut replays = BTreeMap::new(); - let mut resume_live_at = resume_at.clone(); - for channel in channels { - let replay = tx - .message_events() - .replay(&channel, resume_at.get(&channel.id)) - .await?; - - if let Some(last) = replay.last() { - resume_live_at.advance(last); - } - - replays.insert(channel.id.clone(), replay); - } - - let replay = stream::select_all(replays.into_values().map(stream::iter)); - - // no skip_expired or resume transforms for stored_messages, as it's - // constructed not to contain messages meeting either criterion. - // - // * skip_expired is redundant with the `tx.broadcasts().expire(…)` call; - // * resume is redundant with the resume_at argument to - // `tx.broadcasts().replay(…)`. - let live_messages = live_messages - // Filtering on the broadcast resume point filters out messages - // before resume_at, and filters out messages duplicated from - // stored_messages. - .filter(Self::resume(resume_live_at)); - - Ok(created_events.chain(replay).chain(live_messages).scan( - resume_at, - |resume_point, event| { - match event.data { - types::ChannelEventData::Deleted(_) => resume_point.forget(&event), - _ => resume_point.advance(&event), - } - - let event = types::ResumableEvent(resume_point.clone(), event); - - future::ready(Some(event)) - }, - )) - } - - fn resume( - resume_at: ResumePoint, - ) -> impl for<'m> FnMut(&'m types::ChannelEvent) -> future::Ready<bool> { - move |event| future::ready(resume_at.not_after(event)) - } -} - -#[derive(Debug, thiserror::Error)] -pub enum EventsError { - #[error("channel {0} not found")] - ChannelNotFound(channel::Id), - #[error(transparent)] - DatabaseError(#[from] sqlx::Error), -} |
