diff options
Diffstat (limited to 'src/events/app/events.rs')
| -rw-r--r-- | src/events/app/events.rs | 141 |
1 files changed, 0 insertions, 141 deletions
diff --git a/src/events/app/events.rs b/src/events/app/events.rs deleted file mode 100644 index 8b76994..0000000 --- a/src/events/app/events.rs +++ /dev/null @@ -1,141 +0,0 @@ -use chrono::TimeDelta; -use futures::{ - future, - stream::{self, StreamExt as _}, - Stream, -}; -use sqlx::sqlite::SqlitePool; - -use super::Broadcaster; -use crate::{ - clock::DateTime, - events::{ - app::EventsError, - repo::broadcast::{self, Provider as _}, - }, - repo::{ - channel::{self, Provider as _}, - error::NotFound as _, - login::Login, - }, -}; - -pub struct Events<'a> { - db: &'a SqlitePool, - broadcaster: &'a Broadcaster, -} - -impl<'a> Events<'a> { - pub const fn new(db: &'a SqlitePool, broadcaster: &'a Broadcaster) -> Self { - Self { db, broadcaster } - } - - pub async fn send( - &self, - login: &Login, - channel: &channel::Id, - body: &str, - sent_at: &DateTime, - ) -> Result<broadcast::Message, Error> { - let mut tx = self.db.begin().await?; - let channel = tx - .channels() - .by_id(channel) - .await - .not_found(|| EventsError::ChannelNotFound(channel.clone()))?; - let message = tx - .broadcast() - .create(login, &channel, body, sent_at) - .await?; - tx.commit().await?; - - self.broadcaster.broadcast(&channel.id, &message); - Ok(message) - } - - pub async fn subscribe( - &self, - channel: &channel::Id, - subscribed_at: &DateTime, - resume_at: Option<broadcast::Sequence>, - ) -> Result<impl Stream<Item = broadcast::Message> + std::fmt::Debug, Error> { - // Somewhat arbitrarily, expire after 90 days. - let expire_at = subscribed_at.to_owned() - TimeDelta::days(90); - - let mut tx = self.db.begin().await?; - let channel = tx - .channels() - .by_id(channel) - .await - .not_found(|| EventsError::ChannelNotFound(channel.clone()))?; - - // Subscribe before retrieving, to catch messages broadcast while we're - // querying the DB. We'll prune out duplicates later. - let live_messages = self.broadcaster.subscribe(&channel.id); - - tx.broadcast().expire(&expire_at).await?; - let stored_messages = tx.broadcast().replay(&channel, resume_at).await?; - tx.commit().await?; - - let resume_broadcast_at = stored_messages - .last() - .map(|message| message.sequence) - .or(resume_at); - - // This should always be the case, up to integer rollover, primarily - // because every message in stored_messages has a sequence not less - // than `resume_at`, or `resume_at` is None. We use the last message - // (if any) to decide when to resume the `live_messages` stream. - // - // It probably simplifies to assert!(resume_at <= resume_broadcast_at), but - // this form captures more of the reasoning. - assert!( - (resume_at.is_none() && resume_broadcast_at.is_none()) - || (stored_messages.is_empty() && resume_at == resume_broadcast_at) - || resume_at < resume_broadcast_at - ); - - // no skip_expired or resume transforms for stored_messages, as it's - // constructed not to contain messages meeting either criterion. - // - // * skip_expired is redundant with the `tx.broadcasts().expire(…)` call; - // * resume is redundant with the resume_at argument to - // `tx.broadcasts().replay(…)`. - let stored_messages = stream::iter(stored_messages); - let live_messages = live_messages - // Sure, it's temporally improbable that we'll ever skip a message - // that's 90 days old, but there's no reason not to be thorough. - .filter(Self::skip_expired(&expire_at)) - // Filtering on the broadcast resume point filters out messages - // before resume_at, and filters out messages duplicated from - // stored_messages. - .filter(Self::resume(resume_broadcast_at)); - - Ok(stored_messages.chain(live_messages)) - } - - fn resume( - resume_at: Option<broadcast::Sequence>, - ) -> impl for<'m> FnMut(&'m broadcast::Message) -> future::Ready<bool> { - move |msg| { - future::ready(match resume_at { - None => true, - Some(resume_at) => msg.sequence > resume_at, - }) - } - } - fn skip_expired( - expire_at: &DateTime, - ) -> impl for<'m> FnMut(&'m broadcast::Message) -> future::Ready<bool> { - let expire_at = expire_at.to_owned(); - move |msg| future::ready(msg.sent_at > expire_at) - } -} - -#[derive(Debug, thiserror::Error)] -pub enum Error { - #[error("channel {0} not found")] - ChannelNotFound(channel::Id), - #[error(transparent)] - DatabaseError(#[from] sqlx::Error), -} |
