use chrono::TimeDelta; use futures::{ future, stream::{self, StreamExt as _}, Stream, }; use sqlx::sqlite::SqlitePool; use super::Broadcaster; use crate::{ clock::DateTime, events::repo::broadcast::{self, Provider as _}, repo::{ channel::{self, Provider as _}, error::NotFound as _, login::Login, }, }; pub struct Events<'a> { db: &'a SqlitePool, broadcaster: &'a Broadcaster, } impl<'a> Events<'a> { pub const fn new(db: &'a SqlitePool, broadcaster: &'a Broadcaster) -> Self { Self { db, broadcaster } } pub async fn send( &self, login: &Login, channel: &channel::Id, body: &str, sent_at: &DateTime, ) -> Result { let mut tx = self.db.begin().await?; let channel = tx .channels() .by_id(channel) .await .not_found(|| EventsError::ChannelNotFound(channel.clone()))?; let message = tx .broadcast() .create(login, &channel, body, sent_at) .await?; tx.commit().await?; self.broadcaster.broadcast(&channel.id, &message); Ok(message) } pub async fn subscribe( &self, channel: &channel::Id, subscribed_at: &DateTime, resume_at: Option, ) -> Result + std::fmt::Debug, EventsError> { // Somewhat arbitrarily, expire after 90 days. let expire_at = subscribed_at.to_owned() - TimeDelta::days(90); let mut tx = self.db.begin().await?; let channel = tx .channels() .by_id(channel) .await .not_found(|| EventsError::ChannelNotFound(channel.clone()))?; // Subscribe before retrieving, to catch messages broadcast while we're // querying the DB. We'll prune out duplicates later. let live_messages = self.broadcaster.subscribe(&channel.id); tx.broadcast().expire(&expire_at).await?; let stored_messages = tx.broadcast().replay(&channel, resume_at).await?; tx.commit().await?; let resume_broadcast_at = stored_messages .last() .map(|message| message.sequence) .or(resume_at); // This should always be the case, up to integer rollover, primarily // because every message in stored_messages has a sequence not less // than `resume_at`, or `resume_at` is None. We use the last message // (if any) to decide when to resume the `live_messages` stream. // // It probably simplifies to assert!(resume_at <= resume_broadcast_at), but // this form captures more of the reasoning. assert!( (resume_at.is_none() && resume_broadcast_at.is_none()) || (stored_messages.is_empty() && resume_at == resume_broadcast_at) || resume_at < resume_broadcast_at ); // no skip_expired or resume transforms for stored_messages, as it's // constructed not to contain messages meeting either criterion. // // * skip_expired is redundant with the `tx.broadcasts().expire(…)` call; // * resume is redundant with the resume_at argument to // `tx.broadcasts().replay(…)`. let stored_messages = stream::iter(stored_messages); let live_messages = live_messages // Sure, it's temporally improbable that we'll ever skip a message // that's 90 days old, but there's no reason not to be thorough. .filter(Self::skip_expired(&expire_at)) // Filtering on the broadcast resume point filters out messages // before resume_at, and filters out messages duplicated from // stored_messages. .filter(Self::resume(resume_broadcast_at)); Ok(stored_messages.chain(live_messages)) } fn resume( resume_at: Option, ) -> impl for<'m> FnMut(&'m broadcast::Message) -> future::Ready { move |msg| { future::ready(match resume_at { None => true, Some(resume_at) => msg.sequence > resume_at, }) } } fn skip_expired( expire_at: &DateTime, ) -> impl for<'m> FnMut(&'m broadcast::Message) -> future::Ready { let expire_at = expire_at.to_owned(); move |msg| future::ready(msg.sent_at > expire_at) } } #[derive(Debug, thiserror::Error)] pub enum EventsError { #[error("channel {0} not found")] ChannelNotFound(channel::Id), #[error(transparent)] DatabaseError(#[from] sqlx::Error), }