use chrono::TimeDelta; use futures::{ future, stream::{self, StreamExt as _}, Stream, }; use sqlx::sqlite::SqlitePool; use super::{ broadcaster::Broadcaster, repo::message::Provider as _, types::{self, ChannelEvent}, }; use crate::{ channel::{self, repo::Provider as _}, clock::DateTime, db::NotFound as _, event::{repo::Provider as _, Sequence}, login::Login, }; pub struct Events<'a> { db: &'a SqlitePool, events: &'a Broadcaster, } impl<'a> Events<'a> { pub const fn new(db: &'a SqlitePool, events: &'a Broadcaster) -> Self { Self { db, events } } pub async fn send( &self, login: &Login, channel: &channel::Id, body: &str, sent_at: &DateTime, ) -> Result { let mut tx = self.db.begin().await?; let channel = tx .channels() .by_id(channel) .await .not_found(|| EventsError::ChannelNotFound(channel.clone()))?; let sent_sequence = tx.sequence().next().await?; let event = tx .message_events() .create(login, &channel, sent_at, sent_sequence, body) .await?; tx.commit().await?; self.events.broadcast(&event); Ok(event) } pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { // Somewhat arbitrarily, expire after 90 days. let expire_at = relative_to.to_owned() - TimeDelta::days(90); let mut tx = self.db.begin().await?; let expired = tx.message_events().expired(&expire_at).await?; let mut events = Vec::with_capacity(expired.len()); for (channel, message) in expired { let deleted_sequence = tx.sequence().next().await?; let event = tx .message_events() .delete(&channel, &message, relative_to, deleted_sequence) .await?; events.push(event); } tx.commit().await?; for event in events { self.events.broadcast(&event); } Ok(()) } pub async fn subscribe( &self, resume_at: Option, ) -> Result + std::fmt::Debug, sqlx::Error> { // Subscribe before retrieving, to catch messages broadcast while we're // querying the DB. We'll prune out duplicates later. let live_messages = self.events.subscribe(); let mut tx = self.db.begin().await?; let channels = tx.channels().replay(resume_at).await?; let channel_events = channels .into_iter() .map(ChannelEvent::created) .filter(move |event| resume_at.map_or(true, |resume_at| event.sequence > resume_at)); let message_events = tx.message_events().replay(resume_at).await?; let mut replay_events = channel_events .into_iter() .chain(message_events.into_iter()) .collect::>(); replay_events.sort_by_key(|event| event.sequence); let resume_live_at = replay_events.last().map(|event| event.sequence); let replay = stream::iter(replay_events); // no skip_expired or resume transforms for stored_messages, as it's // constructed not to contain messages meeting either criterion. // // * skip_expired is redundant with the `tx.broadcasts().expire(…)` call; // * resume is redundant with the resume_at argument to // `tx.broadcasts().replay(…)`. let live_messages = live_messages // Filtering on the broadcast resume point filters out messages // before resume_at, and filters out messages duplicated from // stored_messages. .filter(Self::resume(resume_live_at)); Ok(replay.chain(live_messages)) } fn resume( resume_at: Option, ) -> impl for<'m> FnMut(&'m types::ChannelEvent) -> future::Ready { move |event| future::ready(resume_at < Some(event.sequence)) } } #[derive(Debug, thiserror::Error)] pub enum EventsError { #[error("channel {0} not found")] ChannelNotFound(channel::Id), #[error(transparent)] DatabaseError(#[from] sqlx::Error), }