diff options
Diffstat (limited to 'src/events/app.rs')
| -rw-r--r-- | src/events/app.rs | 76 |
1 files changed, 26 insertions, 50 deletions
diff --git a/src/events/app.rs b/src/events/app.rs index db7f430..c15f11e 100644 --- a/src/events/app.rs +++ b/src/events/app.rs @@ -1,5 +1,3 @@ -use std::collections::BTreeMap; - use chrono::TimeDelta; use futures::{ future, @@ -11,7 +9,7 @@ use sqlx::sqlite::SqlitePool; use super::{ broadcaster::Broadcaster, repo::message::Provider as _, - types::{self, ChannelEvent, ResumePoint}, + types::{self, ChannelEvent}, }; use crate::{ clock::DateTime, @@ -19,6 +17,7 @@ use crate::{ channel::{self, Provider as _}, error::NotFound as _, login::Login, + sequence::{Provider as _, Sequence}, }, }; @@ -45,9 +44,10 @@ impl<'a> Events<'a> { .by_id(channel) .await .not_found(|| EventsError::ChannelNotFound(channel.clone()))?; + let sent_sequence = tx.sequence().next().await?; let event = tx .message_events() - .create(login, &channel, body, sent_at) + .create(login, &channel, sent_at, sent_sequence, body) .await?; tx.commit().await?; @@ -64,10 +64,10 @@ impl<'a> Events<'a> { let mut events = Vec::with_capacity(expired.len()); for (channel, message) in expired { - let sequence = tx.message_events().assign_sequence(&channel).await?; + let deleted_sequence = tx.sequence().next().await?; let event = tx .message_events() - .delete_expired(&channel, &message, sequence, relative_to) + .delete(&channel, &message, relative_to, deleted_sequence) .await?; events.push(event); } @@ -83,42 +83,30 @@ impl<'a> Events<'a> { pub async fn subscribe( &self, - resume_at: ResumePoint, - ) -> Result<impl Stream<Item = types::ResumableEvent> + std::fmt::Debug, sqlx::Error> { - let mut tx = self.db.begin().await?; - let channels = tx.channels().all().await?; - - let created_events = { - let resume_at = resume_at.clone(); - let channels = channels.clone(); - stream::iter( - channels - .into_iter() - .map(ChannelEvent::created) - .filter(move |event| resume_at.not_after(event)), - ) - }; - + resume_at: Option<Sequence>, + ) -> Result<impl Stream<Item = types::ChannelEvent> + std::fmt::Debug, sqlx::Error> { // Subscribe before retrieving, to catch messages broadcast while we're // querying the DB. We'll prune out duplicates later. let live_messages = self.events.subscribe(); - let mut replays = BTreeMap::new(); - let mut resume_live_at = resume_at.clone(); - for channel in channels { - let replay = tx - .message_events() - .replay(&channel, resume_at.get(&channel.id)) - .await?; + let mut tx = self.db.begin().await?; + let channels = tx.channels().replay(resume_at).await?; - if let Some(last) = replay.last() { - resume_live_at.advance(last); - } + let channel_events = channels + .into_iter() + .map(ChannelEvent::created) + .filter(move |event| resume_at.map_or(true, |resume_at| event.sequence > resume_at)); - replays.insert(channel.id.clone(), replay); - } + let message_events = tx.message_events().replay(resume_at).await?; + + let mut replay_events = channel_events + .into_iter() + .chain(message_events.into_iter()) + .collect::<Vec<_>>(); + replay_events.sort_by_key(|event| event.sequence); + let resume_live_at = replay_events.last().map(|event| event.sequence); - let replay = stream::select_all(replays.into_values().map(stream::iter)); + let replay = stream::iter(replay_events); // no skip_expired or resume transforms for stored_messages, as it's // constructed not to contain messages meeting either criterion. @@ -132,25 +120,13 @@ impl<'a> Events<'a> { // stored_messages. .filter(Self::resume(resume_live_at)); - Ok(created_events.chain(replay).chain(live_messages).scan( - resume_at, - |resume_point, event| { - match event.data { - types::ChannelEventData::Deleted(_) => resume_point.forget(&event), - _ => resume_point.advance(&event), - } - - let event = types::ResumableEvent(resume_point.clone(), event); - - future::ready(Some(event)) - }, - )) + Ok(replay.chain(live_messages)) } fn resume( - resume_at: ResumePoint, + resume_at: Option<Sequence>, ) -> impl for<'m> FnMut(&'m types::ChannelEvent) -> future::Ready<bool> { - move |event| future::ready(resume_at.not_after(event)) + move |event| future::ready(resume_at < Some(event.sequence)) } } |
