use chrono::TimeDelta; use futures::{ future, stream::{self, StreamExt as _}, Stream, }; use sqlx::sqlite::SqlitePool; use crate::{ clock::DateTime, events::{ app::Broadcaster, repo::broadcast::{self, Provider as _}, }, repo::{ channel::{self, Channel, Provider as _}, error::NotFound as _, login::Login, }, }; pub struct Channels<'a> { db: &'a SqlitePool, broadcaster: &'a Broadcaster, } impl<'a> Channels<'a> { pub const fn new(db: &'a SqlitePool, broadcaster: &'a Broadcaster) -> Self { Self { db, broadcaster } } pub async fn create(&self, name: &str) -> Result { let mut tx = self.db.begin().await?; let channel = tx .channels() .create(name) .await .map_err(|err| CreateError::from_duplicate_name(err, name))?; self.broadcaster.register_channel(&channel.id); tx.commit().await?; Ok(channel) } pub async fn all(&self) -> Result, InternalError> { let mut tx = self.db.begin().await?; let channels = tx.channels().all().await?; tx.commit().await?; Ok(channels) } pub async fn send( &self, login: &Login, channel: &channel::Id, body: &str, sent_at: &DateTime, ) -> Result { let mut tx = self.db.begin().await?; let channel = tx .channels() .by_id(channel) .await .not_found(|| EventsError::ChannelNotFound(channel.clone()))?; let message = tx .broadcast() .create(login, &channel, body, sent_at) .await?; tx.commit().await?; self.broadcaster.broadcast(&channel.id, &message); Ok(message) } pub async fn events( &self, channel: &channel::Id, subscribed_at: &DateTime, resume_at: Option<&str>, ) -> Result + std::fmt::Debug, EventsError> { // Somewhat arbitrarily, expire after 90 days. let expire_at = subscribed_at.to_owned() - TimeDelta::days(90); let resume_at = resume_at .map(chrono::DateTime::parse_from_rfc3339) .transpose()? .map(|resume_at| resume_at.to_utc()); let mut tx = self.db.begin().await?; let channel = tx .channels() .by_id(channel) .await .not_found(|| EventsError::ChannelNotFound(channel.clone()))?; let live_messages = self .broadcaster .listen(&channel.id) .filter(Self::skip_stale(resume_at.as_ref())) .filter(Self::skip_expired(&expire_at)); tx.broadcast().expire(&expire_at).await?; let stored_messages = tx.broadcast().replay(&channel, resume_at.as_ref()).await?; tx.commit().await?; let stored_messages = stream::iter(stored_messages); Ok(stored_messages.chain(live_messages)) } fn skip_stale( resume_at: Option<&DateTime>, ) -> impl for<'m> FnMut(&'m broadcast::Message) -> future::Ready { let resume_at = resume_at.cloned(); move |msg| { future::ready(match resume_at { None => true, Some(resume_at) => msg.sent_at > resume_at, }) } } fn skip_expired( expire_at: &DateTime, ) -> impl for<'m> FnMut(&'m broadcast::Message) -> future::Ready { let expire_at = expire_at.to_owned(); move |msg| future::ready(msg.sent_at > expire_at) } } #[derive(Debug, thiserror::Error)] pub enum CreateError { #[error("channel named {0} already exists")] DuplicateName(String), #[error(transparent)] DatabaseError(#[from] sqlx::Error), } impl CreateError { fn from_duplicate_name(error: sqlx::Error, name: &str) -> Self { if let Some(error) = error.as_database_error() { if error.is_unique_violation() { return Self::DuplicateName(name.into()); } } Self::from(error) } } #[derive(Debug, thiserror::Error)] pub enum InternalError { #[error(transparent)] DatabaseError(#[from] sqlx::Error), } #[derive(Debug, thiserror::Error)] pub enum EventsError { #[error("channel {0} not found")] ChannelNotFound(channel::Id), #[error(transparent)] ResumeAtError(#[from] chrono::ParseError), #[error(transparent)] DatabaseError(#[from] sqlx::Error), }