use chrono::TimeDelta; use futures::{ future, stream::{self, StreamExt as _}, Stream, }; use sqlx::sqlite::SqlitePool; use crate::{ clock::DateTime, events::{ app::Broadcaster, repo::broadcast::{self, Provider as _}, }, repo::{ channel::{self, Channel, Provider as _}, error::NotFound as _, login::Login, }, }; pub struct Channels<'a> { db: &'a SqlitePool, broadcaster: &'a Broadcaster, } impl<'a> Channels<'a> { pub const fn new(db: &'a SqlitePool, broadcaster: &'a Broadcaster) -> Self { Self { db, broadcaster } } pub async fn create(&self, name: &str) -> Result { let mut tx = self.db.begin().await?; let channel = tx .channels() .create(name) .await .map_err(|err| CreateError::from_duplicate_name(err, name))?; self.broadcaster.register_channel(&channel.id); tx.commit().await?; Ok(channel) } pub async fn all(&self) -> Result, InternalError> { let mut tx = self.db.begin().await?; let channels = tx.channels().all().await?; tx.commit().await?; Ok(channels) } pub async fn send( &self, login: &Login, channel: &channel::Id, body: &str, sent_at: &DateTime, ) -> Result { let mut tx = self.db.begin().await?; let channel = tx .channels() .by_id(channel) .await .not_found(|| EventsError::ChannelNotFound(channel.clone()))?; let message = tx .broadcast() .create(login, &channel, body, sent_at) .await?; tx.commit().await?; self.broadcaster.broadcast(&channel.id, &message); Ok(message) } pub async fn events( &self, channel: &channel::Id, subscribed_at: &DateTime, resume_at: Option, ) -> Result + std::fmt::Debug, EventsError> { // Somewhat arbitrarily, expire after 90 days. let expire_at = subscribed_at.to_owned() - TimeDelta::days(90); let mut tx = self.db.begin().await?; let channel = tx .channels() .by_id(channel) .await .not_found(|| EventsError::ChannelNotFound(channel.clone()))?; // Subscribe before retrieving, to catch messages broadcast while we're // querying the DB. We'll prune out duplicates later. let live_messages = self.broadcaster.listen(&channel.id); tx.broadcast().expire(&expire_at).await?; let stored_messages = tx.broadcast().replay(&channel, resume_at).await?; tx.commit().await?; let resume_broadcast_at = stored_messages .last() .map(|message| message.sequence) .or(resume_at); // This should always be the case, up to integer rollover, primarily // because every message in stored_messages has a sequence not less // than `resume_at`, or `resume_at` is None. We use the last message // (if any) to decide when to resume the `live_messages` stream. // // It probably simplifies to assert!(resume_at <= resume_broadcast_at), but // this form captures more of the reasoning. assert!( (resume_at.is_none() && resume_broadcast_at.is_none()) || (stored_messages.is_empty() && resume_at == resume_broadcast_at) || resume_at < resume_broadcast_at ); // no skip_expired or resume transforms for stored_messages, as it's // constructed not to contain messages meeting either criterion. // // * skip_expired is redundant with the `tx.broadcasts().expire(…)` call; // * resume is redundant with the resume_at argument to // `tx.broadcasts().replay(…)`. let stored_messages = stream::iter(stored_messages); let live_messages = live_messages // Sure, it's temporally improbable that we'll ever skip a message // that's 90 days old, but there's no reason not to be thorough. .filter(Self::skip_expired(&expire_at)) // Filtering on the broadcast resume point filters out messages // before resume_at, and filters out messages duplicated from // stored_messages. .filter(Self::resume(resume_broadcast_at)); Ok(stored_messages.chain(live_messages)) } fn resume( resume_at: Option, ) -> impl for<'m> FnMut(&'m broadcast::Message) -> future::Ready { let resume_at = resume_at; move |msg| { future::ready(match resume_at { None => true, Some(resume_at) => msg.sequence > resume_at, }) } } fn skip_expired( expire_at: &DateTime, ) -> impl for<'m> FnMut(&'m broadcast::Message) -> future::Ready { let expire_at = expire_at.to_owned(); move |msg| future::ready(msg.sent_at > expire_at) } } #[derive(Debug, thiserror::Error)] pub enum CreateError { #[error("channel named {0} already exists")] DuplicateName(String), #[error(transparent)] DatabaseError(#[from] sqlx::Error), } impl CreateError { fn from_duplicate_name(error: sqlx::Error, name: &str) -> Self { if let Some(error) = error.as_database_error() { if error.is_unique_violation() { return Self::DuplicateName(name.into()); } } Self::from(error) } } #[derive(Debug, thiserror::Error)] pub enum InternalError { #[error(transparent)] DatabaseError(#[from] sqlx::Error), } #[derive(Debug, thiserror::Error)] pub enum EventsError { #[error("channel {0} not found")] ChannelNotFound(channel::Id), #[error(transparent)] ResumeAtError(#[from] chrono::ParseError), #[error(transparent)] DatabaseError(#[from] sqlx::Error), }