summaryrefslogtreecommitdiff
path: root/src/event/app.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/event/app.rs')
-rw-r--r--src/event/app.rs18
1 files changed, 10 insertions, 8 deletions
diff --git a/src/event/app.rs b/src/event/app.rs
index 3d35f1a..5e9e79a 100644
--- a/src/event/app.rs
+++ b/src/event/app.rs
@@ -42,10 +42,10 @@ impl<'a> Events<'a> {
.by_id(channel)
.await
.not_found(|| EventsError::ChannelNotFound(channel.clone()))?;
- let sent_sequence = tx.sequence().next().await?;
+ let sent = tx.sequence().next(sent_at).await?;
let event = tx
.message_events()
- .create(login, &channel, sent_at, sent_sequence, body)
+ .create(login, &channel, &sent, body)
.await?;
tx.commit().await?;
@@ -62,10 +62,10 @@ impl<'a> Events<'a> {
let mut events = Vec::with_capacity(expired.len());
for (channel, message) in expired {
- let deleted_sequence = tx.sequence().next().await?;
+ let deleted = tx.sequence().next(relative_to).await?;
let event = tx
.message_events()
- .delete(&channel, &message, relative_to, deleted_sequence)
+ .delete(&channel, &message, &deleted)
.await?;
events.push(event);
}
@@ -93,7 +93,9 @@ impl<'a> Events<'a> {
let channel_events = channels
.into_iter()
.map(ChannelEvent::created)
- .filter(move |event| resume_at.map_or(true, |resume_at| event.sequence > resume_at));
+ .filter(move |event| {
+ resume_at.map_or(true, |resume_at| Sequence::from(event) > resume_at)
+ });
let message_events = tx.message_events().replay(resume_at).await?;
@@ -101,8 +103,8 @@ impl<'a> Events<'a> {
.into_iter()
.chain(message_events.into_iter())
.collect::<Vec<_>>();
- replay_events.sort_by_key(|event| event.sequence);
- let resume_live_at = replay_events.last().map(|event| event.sequence);
+ replay_events.sort_by_key(|event| Sequence::from(event));
+ let resume_live_at = replay_events.last().map(Sequence::from);
let replay = stream::iter(replay_events);
@@ -124,7 +126,7 @@ impl<'a> Events<'a> {
fn resume(
resume_at: Option<Sequence>,
) -> impl for<'m> FnMut(&'m types::ChannelEvent) -> future::Ready<bool> {
- move |event| future::ready(resume_at < Some(event.sequence))
+ move |event| future::ready(resume_at < Some(Sequence::from(event)))
}
}