use std::collections::BTreeMap; use chrono::TimeDelta; use futures::{ future, stream::{self, StreamExt as _}, Stream, }; use sqlx::sqlite::SqlitePool; use super::{ broadcaster::Broadcaster, repo::message::Provider as _, types::{self, ChannelEvent, ResumePoint}, }; use crate::{ clock::DateTime, repo::{ channel::{self, Provider as _}, error::NotFound as _, login::Login, }, }; pub struct Events<'a> { db: &'a SqlitePool, broadcaster: &'a Broadcaster, } impl<'a> Events<'a> { pub const fn new(db: &'a SqlitePool, broadcaster: &'a Broadcaster) -> Self { Self { db, broadcaster } } pub async fn send( &self, login: &Login, channel: &channel::Id, body: &str, sent_at: &DateTime, ) -> Result { let mut tx = self.db.begin().await?; let channel = tx .channels() .by_id(channel) .await .not_found(|| EventsError::ChannelNotFound(channel.clone()))?; let event = tx .message_events() .create(login, &channel, body, sent_at) .await?; tx.commit().await?; self.broadcaster.broadcast(&event); Ok(event) } pub async fn subscribe( &self, subscribed_at: &DateTime, resume_at: ResumePoint, ) -> Result + std::fmt::Debug, sqlx::Error> { // Somewhat arbitrarily, expire after 90 days. let expire_at = subscribed_at.to_owned() - TimeDelta::days(90); let mut tx = self.db.begin().await?; let channels = tx.channels().all().await?; let created_events = { let resume_at = resume_at.clone(); let channels = channels.clone(); stream::iter( channels .into_iter() .map(ChannelEvent::created) .filter(move |event| resume_at.not_after(event)), ) }; // Subscribe before retrieving, to catch messages broadcast while we're // querying the DB. We'll prune out duplicates later. let live_messages = self.broadcaster.subscribe(); tx.message_events().expire(&expire_at).await?; let mut replays = BTreeMap::new(); let mut resume_live_at = resume_at.clone(); for channel in channels { let replay = tx .message_events() .replay(&channel, resume_at.get(&channel.id)) .await?; if let Some(last) = replay.last() { resume_live_at.advance(&channel.id, last.sequence); } replays.insert(channel.id.clone(), replay); } let replay = stream::select_all(replays.into_values().map(stream::iter)); // no skip_expired or resume transforms for stored_messages, as it's // constructed not to contain messages meeting either criterion. // // * skip_expired is redundant with the `tx.broadcasts().expire(…)` call; // * resume is redundant with the resume_at argument to // `tx.broadcasts().replay(…)`. let live_messages = live_messages // Sure, it's temporally improbable that we'll ever skip a message // that's 90 days old, but there's no reason not to be thorough. .filter(Self::skip_expired(&expire_at)) // Filtering on the broadcast resume point filters out messages // before resume_at, and filters out messages duplicated from // stored_messages. .filter(Self::resume(resume_live_at)); Ok(created_events.chain(replay).chain(live_messages).scan( resume_at, |resume_point, event| { let channel = &event.channel.id; let sequence = event.sequence; resume_point.advance(channel, sequence); let event = types::ResumableEvent(resume_point.clone(), event); future::ready(Some(event)) }, )) } fn resume( resume_at: ResumePoint, ) -> impl for<'m> FnMut(&'m types::ChannelEvent) -> future::Ready { move |event| future::ready(resume_at.not_after(event)) } fn skip_expired( expire_at: &DateTime, ) -> impl for<'m> FnMut(&'m types::ChannelEvent) -> future::Ready { let expire_at = expire_at.to_owned(); move |event| future::ready(expire_at < event.at) } } #[derive(Debug, thiserror::Error)] pub enum EventsError { #[error("channel {0} not found")] ChannelNotFound(channel::Id), #[error(transparent)] DatabaseError(#[from] sqlx::Error), }