summaryrefslogtreecommitdiff
path: root/src/events/app.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/events/app.rs')
-rw-r--r--src/events/app.rs76
1 files changed, 26 insertions, 50 deletions
diff --git a/src/events/app.rs b/src/events/app.rs
index db7f430..c15f11e 100644
--- a/src/events/app.rs
+++ b/src/events/app.rs
@@ -1,5 +1,3 @@
-use std::collections::BTreeMap;
-
use chrono::TimeDelta;
use futures::{
future,
@@ -11,7 +9,7 @@ use sqlx::sqlite::SqlitePool;
use super::{
broadcaster::Broadcaster,
repo::message::Provider as _,
- types::{self, ChannelEvent, ResumePoint},
+ types::{self, ChannelEvent},
};
use crate::{
clock::DateTime,
@@ -19,6 +17,7 @@ use crate::{
channel::{self, Provider as _},
error::NotFound as _,
login::Login,
+ sequence::{Provider as _, Sequence},
},
};
@@ -45,9 +44,10 @@ impl<'a> Events<'a> {
.by_id(channel)
.await
.not_found(|| EventsError::ChannelNotFound(channel.clone()))?;
+ let sent_sequence = tx.sequence().next().await?;
let event = tx
.message_events()
- .create(login, &channel, body, sent_at)
+ .create(login, &channel, sent_at, sent_sequence, body)
.await?;
tx.commit().await?;
@@ -64,10 +64,10 @@ impl<'a> Events<'a> {
let mut events = Vec::with_capacity(expired.len());
for (channel, message) in expired {
- let sequence = tx.message_events().assign_sequence(&channel).await?;
+ let deleted_sequence = tx.sequence().next().await?;
let event = tx
.message_events()
- .delete_expired(&channel, &message, sequence, relative_to)
+ .delete(&channel, &message, relative_to, deleted_sequence)
.await?;
events.push(event);
}
@@ -83,42 +83,30 @@ impl<'a> Events<'a> {
pub async fn subscribe(
&self,
- resume_at: ResumePoint,
- ) -> Result<impl Stream<Item = types::ResumableEvent> + std::fmt::Debug, sqlx::Error> {
- let mut tx = self.db.begin().await?;
- let channels = tx.channels().all().await?;
-
- let created_events = {
- let resume_at = resume_at.clone();
- let channels = channels.clone();
- stream::iter(
- channels
- .into_iter()
- .map(ChannelEvent::created)
- .filter(move |event| resume_at.not_after(event)),
- )
- };
-
+ resume_at: Option<Sequence>,
+ ) -> Result<impl Stream<Item = types::ChannelEvent> + std::fmt::Debug, sqlx::Error> {
// Subscribe before retrieving, to catch messages broadcast while we're
// querying the DB. We'll prune out duplicates later.
let live_messages = self.events.subscribe();
- let mut replays = BTreeMap::new();
- let mut resume_live_at = resume_at.clone();
- for channel in channels {
- let replay = tx
- .message_events()
- .replay(&channel, resume_at.get(&channel.id))
- .await?;
+ let mut tx = self.db.begin().await?;
+ let channels = tx.channels().replay(resume_at).await?;
- if let Some(last) = replay.last() {
- resume_live_at.advance(last);
- }
+ let channel_events = channels
+ .into_iter()
+ .map(ChannelEvent::created)
+ .filter(move |event| resume_at.map_or(true, |resume_at| event.sequence > resume_at));
- replays.insert(channel.id.clone(), replay);
- }
+ let message_events = tx.message_events().replay(resume_at).await?;
+
+ let mut replay_events = channel_events
+ .into_iter()
+ .chain(message_events.into_iter())
+ .collect::<Vec<_>>();
+ replay_events.sort_by_key(|event| event.sequence);
+ let resume_live_at = replay_events.last().map(|event| event.sequence);
- let replay = stream::select_all(replays.into_values().map(stream::iter));
+ let replay = stream::iter(replay_events);
// no skip_expired or resume transforms for stored_messages, as it's
// constructed not to contain messages meeting either criterion.
@@ -132,25 +120,13 @@ impl<'a> Events<'a> {
// stored_messages.
.filter(Self::resume(resume_live_at));
- Ok(created_events.chain(replay).chain(live_messages).scan(
- resume_at,
- |resume_point, event| {
- match event.data {
- types::ChannelEventData::Deleted(_) => resume_point.forget(&event),
- _ => resume_point.advance(&event),
- }
-
- let event = types::ResumableEvent(resume_point.clone(), event);
-
- future::ready(Some(event))
- },
- ))
+ Ok(replay.chain(live_messages))
}
fn resume(
- resume_at: ResumePoint,
+ resume_at: Option<Sequence>,
) -> impl for<'m> FnMut(&'m types::ChannelEvent) -> future::Ready<bool> {
- move |event| future::ready(resume_at.not_after(event))
+ move |event| future::ready(resume_at < Some(event.sequence))
}
}