diff options
Diffstat (limited to 'src/event/app.rs')
| -rw-r--r-- | src/event/app.rs | 137 |
1 files changed, 137 insertions, 0 deletions
diff --git a/src/event/app.rs b/src/event/app.rs new file mode 100644 index 0000000..b5f2ecc --- /dev/null +++ b/src/event/app.rs @@ -0,0 +1,137 @@ +use chrono::TimeDelta; +use futures::{ + future, + stream::{self, StreamExt as _}, + Stream, +}; +use sqlx::sqlite::SqlitePool; + +use super::{ + broadcaster::Broadcaster, + repo::message::Provider as _, + types::{self, ChannelEvent}, +}; +use crate::{ + channel, + clock::DateTime, + event::Sequence, + login::Login, + repo::{channel::Provider as _, error::NotFound as _, sequence::Provider as _}, +}; + +pub struct Events<'a> { + db: &'a SqlitePool, + events: &'a Broadcaster, +} + +impl<'a> Events<'a> { + pub const fn new(db: &'a SqlitePool, events: &'a Broadcaster) -> Self { + Self { db, events } + } + + pub async fn send( + &self, + login: &Login, + channel: &channel::Id, + body: &str, + sent_at: &DateTime, + ) -> Result<types::ChannelEvent, EventsError> { + let mut tx = self.db.begin().await?; + let channel = tx + .channels() + .by_id(channel) + .await + .not_found(|| EventsError::ChannelNotFound(channel.clone()))?; + let sent_sequence = tx.sequence().next().await?; + let event = tx + .message_events() + .create(login, &channel, sent_at, sent_sequence, body) + .await?; + tx.commit().await?; + + self.events.broadcast(&event); + Ok(event) + } + + pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { + // Somewhat arbitrarily, expire after 90 days. + let expire_at = relative_to.to_owned() - TimeDelta::days(90); + + let mut tx = self.db.begin().await?; + let expired = tx.message_events().expired(&expire_at).await?; + + let mut events = Vec::with_capacity(expired.len()); + for (channel, message) in expired { + let deleted_sequence = tx.sequence().next().await?; + let event = tx + .message_events() + .delete(&channel, &message, relative_to, deleted_sequence) + .await?; + events.push(event); + } + + tx.commit().await?; + + for event in events { + self.events.broadcast(&event); + } + + Ok(()) + } + + pub async fn subscribe( + &self, + resume_at: Option<Sequence>, + ) -> Result<impl Stream<Item = types::ChannelEvent> + std::fmt::Debug, sqlx::Error> { + // Subscribe before retrieving, to catch messages broadcast while we're + // querying the DB. We'll prune out duplicates later. + let live_messages = self.events.subscribe(); + + let mut tx = self.db.begin().await?; + let channels = tx.channels().replay(resume_at).await?; + + let channel_events = channels + .into_iter() + .map(ChannelEvent::created) + .filter(move |event| resume_at.map_or(true, |resume_at| event.sequence > resume_at)); + + let message_events = tx.message_events().replay(resume_at).await?; + + let mut replay_events = channel_events + .into_iter() + .chain(message_events.into_iter()) + .collect::<Vec<_>>(); + replay_events.sort_by_key(|event| event.sequence); + let resume_live_at = replay_events.last().map(|event| event.sequence); + + let replay = stream::iter(replay_events); + + // no skip_expired or resume transforms for stored_messages, as it's + // constructed not to contain messages meeting either criterion. + // + // * skip_expired is redundant with the `tx.broadcasts().expire(…)` call; + // * resume is redundant with the resume_at argument to + // `tx.broadcasts().replay(…)`. + let live_messages = live_messages + // Filtering on the broadcast resume point filters out messages + // before resume_at, and filters out messages duplicated from + // stored_messages. + .filter(Self::resume(resume_live_at)); + + Ok(replay.chain(live_messages)) + } + + fn resume( + resume_at: Option<Sequence>, + ) -> impl for<'m> FnMut(&'m types::ChannelEvent) -> future::Ready<bool> { + move |event| future::ready(resume_at < Some(event.sequence)) + } +} + +#[derive(Debug, thiserror::Error)] +pub enum EventsError { + #[error("channel {0} not found")] + ChannelNotFound(channel::Id), + #[error(transparent)] + DatabaseError(#[from] sqlx::Error), +} |
