use std::collections::BTreeMap; use chrono::TimeDelta; use futures::{ future, stream::{self, StreamExt as _}, Stream, }; use sqlx::sqlite::SqlitePool; use super::{ broadcaster::Broadcaster, repo::message::Provider as _, types::{self, ChannelEvent, ResumePoint}, }; use crate::{ clock::DateTime, repo::{ channel::{self, Provider as _}, error::NotFound as _, login::Login, }, }; pub struct Events<'a> { db: &'a SqlitePool, broadcaster: &'a Broadcaster, } impl<'a> Events<'a> { pub const fn new(db: &'a SqlitePool, broadcaster: &'a Broadcaster) -> Self { Self { db, broadcaster } } pub async fn send( &self, login: &Login, channel: &channel::Id, body: &str, sent_at: &DateTime, ) -> Result { let mut tx = self.db.begin().await?; let channel = tx .channels() .by_id(channel) .await .not_found(|| EventsError::ChannelNotFound(channel.clone()))?; let event = tx .message_events() .create(login, &channel, body, sent_at) .await?; tx.commit().await?; self.broadcaster.broadcast(&event); Ok(event) } pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { // Somewhat arbitrarily, expire after 90 days. let expire_at = relative_to.to_owned() - TimeDelta::days(90); let mut tx = self.db.begin().await?; let expired = tx.message_events().expired(&expire_at).await?; let mut events = Vec::with_capacity(expired.len()); for (channel, message) in expired { let sequence = tx.message_events().assign_sequence(&channel).await?; let event = tx .message_events() .delete_expired(&channel, &message, sequence, relative_to) .await?; events.push(event); } tx.commit().await?; for event in events { self.broadcaster.broadcast(&event); } Ok(()) } pub async fn subscribe( &self, resume_at: ResumePoint, ) -> Result + std::fmt::Debug, sqlx::Error> { let mut tx = self.db.begin().await?; let channels = tx.channels().all().await?; let created_events = { let resume_at = resume_at.clone(); let channels = channels.clone(); stream::iter( channels .into_iter() .map(ChannelEvent::created) .filter(move |event| resume_at.not_after(event)), ) }; // Subscribe before retrieving, to catch messages broadcast while we're // querying the DB. We'll prune out duplicates later. let live_messages = self.broadcaster.subscribe(); let mut replays = BTreeMap::new(); let mut resume_live_at = resume_at.clone(); for channel in channels { let replay = tx .message_events() .replay(&channel, resume_at.get(&channel.id)) .await?; if let Some(last) = replay.last() { resume_live_at.advance(last); } replays.insert(channel.id.clone(), replay); } let replay = stream::select_all(replays.into_values().map(stream::iter)); // no skip_expired or resume transforms for stored_messages, as it's // constructed not to contain messages meeting either criterion. // // * skip_expired is redundant with the `tx.broadcasts().expire(…)` call; // * resume is redundant with the resume_at argument to // `tx.broadcasts().replay(…)`. let live_messages = live_messages // Filtering on the broadcast resume point filters out messages // before resume_at, and filters out messages duplicated from // stored_messages. .filter(Self::resume(resume_live_at)); Ok(created_events.chain(replay).chain(live_messages).scan( resume_at, |resume_point, event| { match event.data { types::ChannelEventData::Deleted(_) => resume_point.forget(&event), _ => resume_point.advance(&event), } let event = types::ResumableEvent(resume_point.clone(), event); future::ready(Some(event)) }, )) } fn resume( resume_at: ResumePoint, ) -> impl for<'m> FnMut(&'m types::ChannelEvent) -> future::Ready { move |event| future::ready(resume_at.not_after(event)) } } #[derive(Debug, thiserror::Error)] pub enum EventsError { #[error("channel {0} not found")] ChannelNotFound(channel::Id), #[error(transparent)] DatabaseError(#[from] sqlx::Error), }