diff options
Diffstat (limited to 'src/events/app.rs')
| -rw-r--r-- | src/events/app.rs | 93 |
1 files changed, 46 insertions, 47 deletions
diff --git a/src/events/app.rs b/src/events/app.rs index 7229551..043a29b 100644 --- a/src/events/app.rs +++ b/src/events/app.rs @@ -1,3 +1,5 @@ +use std::collections::BTreeMap; + use chrono::TimeDelta; use futures::{ future, @@ -8,7 +10,8 @@ use sqlx::sqlite::SqlitePool; use super::{ broadcaster::Broadcaster, - repo::broadcast::{self, Provider as _}, + repo::message::Provider as _, + types::{self, ResumePoint}, }; use crate::{ clock::DateTime, @@ -35,64 +38,56 @@ impl<'a> Events<'a> { channel: &channel::Id, body: &str, sent_at: &DateTime, - ) -> Result<broadcast::Message, EventsError> { + ) -> Result<types::ChannelEvent, EventsError> { let mut tx = self.db.begin().await?; let channel = tx .channels() .by_id(channel) .await .not_found(|| EventsError::ChannelNotFound(channel.clone()))?; - let message = tx - .broadcast() + let event = tx + .message_events() .create(login, &channel, body, sent_at) .await?; tx.commit().await?; - self.broadcaster.broadcast(&channel.id, &message); - Ok(message) + self.broadcaster.broadcast(&event); + Ok(event) } pub async fn subscribe( &self, - channel: &channel::Id, subscribed_at: &DateTime, - resume_at: Option<broadcast::Sequence>, - ) -> Result<impl Stream<Item = broadcast::Message> + std::fmt::Debug, EventsError> { + resume_at: ResumePoint, + ) -> Result<impl Stream<Item = types::ResumableEvent> + std::fmt::Debug, sqlx::Error> { // Somewhat arbitrarily, expire after 90 days. let expire_at = subscribed_at.to_owned() - TimeDelta::days(90); let mut tx = self.db.begin().await?; - let channel = tx - .channels() - .by_id(channel) - .await - .not_found(|| EventsError::ChannelNotFound(channel.clone()))?; + let channels = tx.channels().all().await?; // Subscribe before retrieving, to catch messages broadcast while we're // querying the DB. We'll prune out duplicates later. - let live_messages = self.broadcaster.subscribe(&channel.id); + let live_messages = self.broadcaster.subscribe(); - tx.broadcast().expire(&expire_at).await?; - let stored_messages = tx.broadcast().replay(&channel, resume_at).await?; - tx.commit().await?; + tx.message_events().expire(&expire_at).await?; - let resume_broadcast_at = stored_messages - .last() - .map(|message| message.sequence) - .or(resume_at); + let mut replays = BTreeMap::new(); + let mut resume_live_at = resume_at.clone(); + for channel in channels { + let replay = tx + .message_events() + .replay(&channel, resume_at.get(&channel.id)) + .await?; - // This should always be the case, up to integer rollover, primarily - // because every message in stored_messages has a sequence not less - // than `resume_at`, or `resume_at` is None. We use the last message - // (if any) to decide when to resume the `live_messages` stream. - // - // It probably simplifies to assert!(resume_at <= resume_broadcast_at), but - // this form captures more of the reasoning. - assert!( - (resume_at.is_none() && resume_broadcast_at.is_none()) - || (stored_messages.is_empty() && resume_at == resume_broadcast_at) - || resume_at < resume_broadcast_at - ); + if let Some(last) = replay.last() { + resume_live_at.advance(&channel.id, last.sequence); + } + + replays.insert(channel.id.clone(), replay); + } + + let replay = stream::select_all(replays.into_values().map(stream::iter)); // no skip_expired or resume transforms for stored_messages, as it's // constructed not to contain messages meeting either criterion. @@ -100,7 +95,6 @@ impl<'a> Events<'a> { // * skip_expired is redundant with the `tx.broadcasts().expire(…)` call; // * resume is redundant with the resume_at argument to // `tx.broadcasts().replay(…)`. - let stored_messages = stream::iter(stored_messages); let live_messages = live_messages // Sure, it's temporally improbable that we'll ever skip a message // that's 90 days old, but there's no reason not to be thorough. @@ -108,26 +102,31 @@ impl<'a> Events<'a> { // Filtering on the broadcast resume point filters out messages // before resume_at, and filters out messages duplicated from // stored_messages. - .filter(Self::resume(resume_broadcast_at)); + .filter(Self::resume(resume_live_at)); - Ok(stored_messages.chain(live_messages)) + Ok(replay + .chain(live_messages) + .scan(resume_at, |resume_point, event| { + let channel = &event.channel.id; + let sequence = event.sequence; + resume_point.advance(channel, sequence); + + let event = types::ResumableEvent(resume_point.clone(), event); + + future::ready(Some(event)) + })) } fn resume( - resume_at: Option<broadcast::Sequence>, - ) -> impl for<'m> FnMut(&'m broadcast::Message) -> future::Ready<bool> { - move |msg| { - future::ready(match resume_at { - None => true, - Some(resume_at) => msg.sequence > resume_at, - }) - } + resume_at: ResumePoint, + ) -> impl for<'m> FnMut(&'m types::ChannelEvent) -> future::Ready<bool> { + move |event| future::ready(resume_at < event.sequence()) } fn skip_expired( expire_at: &DateTime, - ) -> impl for<'m> FnMut(&'m broadcast::Message) -> future::Ready<bool> { + ) -> impl for<'m> FnMut(&'m types::ChannelEvent) -> future::Ready<bool> { let expire_at = expire_at.to_owned(); - move |msg| future::ready(msg.sent_at > expire_at) + move |event| future::ready(expire_at < event.at) } } |
