diff options
Diffstat (limited to 'src/events/app.rs')
| -rw-r--r-- | src/events/app.rs | 139 |
1 files changed, 81 insertions, 58 deletions
diff --git a/src/events/app.rs b/src/events/app.rs index 7229551..0cdc641 100644 --- a/src/events/app.rs +++ b/src/events/app.rs @@ -1,3 +1,5 @@ +use std::collections::BTreeMap; + use chrono::TimeDelta; use futures::{ future, @@ -8,7 +10,8 @@ use sqlx::sqlite::SqlitePool; use super::{ broadcaster::Broadcaster, - repo::broadcast::{self, Provider as _}, + repo::message::Provider as _, + types::{self, ChannelEvent, ResumePoint}, }; use crate::{ clock::DateTime, @@ -35,64 +38,87 @@ impl<'a> Events<'a> { channel: &channel::Id, body: &str, sent_at: &DateTime, - ) -> Result<broadcast::Message, EventsError> { + ) -> Result<types::ChannelEvent, EventsError> { let mut tx = self.db.begin().await?; let channel = tx .channels() .by_id(channel) .await .not_found(|| EventsError::ChannelNotFound(channel.clone()))?; - let message = tx - .broadcast() + let event = tx + .message_events() .create(login, &channel, body, sent_at) .await?; tx.commit().await?; - self.broadcaster.broadcast(&channel.id, &message); - Ok(message) + self.broadcaster.broadcast(&event); + Ok(event) } - pub async fn subscribe( - &self, - channel: &channel::Id, - subscribed_at: &DateTime, - resume_at: Option<broadcast::Sequence>, - ) -> Result<impl Stream<Item = broadcast::Message> + std::fmt::Debug, EventsError> { + pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { // Somewhat arbitrarily, expire after 90 days. - let expire_at = subscribed_at.to_owned() - TimeDelta::days(90); + let expire_at = relative_to.to_owned() - TimeDelta::days(90); let mut tx = self.db.begin().await?; - let channel = tx - .channels() - .by_id(channel) - .await - .not_found(|| EventsError::ChannelNotFound(channel.clone()))?; + let expired = tx.message_events().expired(&expire_at).await?; + + let mut events = Vec::with_capacity(expired.len()); + for (channel, message) in expired { + let sequence = tx.message_events().assign_sequence(&channel).await?; + let event = tx + .message_events() + .delete_expired(&channel, &message, sequence, relative_to) + .await?; + events.push(event); + } + + tx.commit().await?; + + for event in events { + self.broadcaster.broadcast(&event); + } + + Ok(()) + } + + pub async fn subscribe( + &self, + resume_at: ResumePoint, + ) -> Result<impl Stream<Item = types::ResumableEvent> + std::fmt::Debug, sqlx::Error> { + let mut tx = self.db.begin().await?; + let channels = tx.channels().all().await?; + + let created_events = { + let resume_at = resume_at.clone(); + let channels = channels.clone(); + stream::iter( + channels + .into_iter() + .map(ChannelEvent::created) + .filter(move |event| resume_at.not_after(event)), + ) + }; // Subscribe before retrieving, to catch messages broadcast while we're // querying the DB. We'll prune out duplicates later. - let live_messages = self.broadcaster.subscribe(&channel.id); + let live_messages = self.broadcaster.subscribe(); - tx.broadcast().expire(&expire_at).await?; - let stored_messages = tx.broadcast().replay(&channel, resume_at).await?; - tx.commit().await?; + let mut replays = BTreeMap::new(); + let mut resume_live_at = resume_at.clone(); + for channel in channels { + let replay = tx + .message_events() + .replay(&channel, resume_at.get(&channel.id)) + .await?; - let resume_broadcast_at = stored_messages - .last() - .map(|message| message.sequence) - .or(resume_at); + if let Some(last) = replay.last() { + resume_live_at.advance(last); + } - // This should always be the case, up to integer rollover, primarily - // because every message in stored_messages has a sequence not less - // than `resume_at`, or `resume_at` is None. We use the last message - // (if any) to decide when to resume the `live_messages` stream. - // - // It probably simplifies to assert!(resume_at <= resume_broadcast_at), but - // this form captures more of the reasoning. - assert!( - (resume_at.is_none() && resume_broadcast_at.is_none()) - || (stored_messages.is_empty() && resume_at == resume_broadcast_at) - || resume_at < resume_broadcast_at - ); + replays.insert(channel.id.clone(), replay); + } + + let replay = stream::select_all(replays.into_values().map(stream::iter)); // no skip_expired or resume transforms for stored_messages, as it's // constructed not to contain messages meeting either criterion. @@ -100,34 +126,31 @@ impl<'a> Events<'a> { // * skip_expired is redundant with the `tx.broadcasts().expire(…)` call; // * resume is redundant with the resume_at argument to // `tx.broadcasts().replay(…)`. - let stored_messages = stream::iter(stored_messages); let live_messages = live_messages - // Sure, it's temporally improbable that we'll ever skip a message - // that's 90 days old, but there's no reason not to be thorough. - .filter(Self::skip_expired(&expire_at)) // Filtering on the broadcast resume point filters out messages // before resume_at, and filters out messages duplicated from // stored_messages. - .filter(Self::resume(resume_broadcast_at)); + .filter(Self::resume(resume_live_at)); + + Ok(created_events.chain(replay).chain(live_messages).scan( + resume_at, + |resume_point, event| { + match event.data { + types::ChannelEventData::Deleted(_) => resume_point.forget(&event), + _ => resume_point.advance(&event), + } - Ok(stored_messages.chain(live_messages)) + let event = types::ResumableEvent(resume_point.clone(), event); + + future::ready(Some(event)) + }, + )) } fn resume( - resume_at: Option<broadcast::Sequence>, - ) -> impl for<'m> FnMut(&'m broadcast::Message) -> future::Ready<bool> { - move |msg| { - future::ready(match resume_at { - None => true, - Some(resume_at) => msg.sequence > resume_at, - }) - } - } - fn skip_expired( - expire_at: &DateTime, - ) -> impl for<'m> FnMut(&'m broadcast::Message) -> future::Ready<bool> { - let expire_at = expire_at.to_owned(); - move |msg| future::ready(msg.sent_at > expire_at) + resume_at: ResumePoint, + ) -> impl for<'m> FnMut(&'m types::ChannelEvent) -> future::Ready<bool> { + move |event| future::ready(resume_at.not_after(event)) } } |
