diff options
Diffstat (limited to 'src/event/app.rs')
| -rw-r--r-- | src/event/app.rs | 18 |
1 files changed, 10 insertions, 8 deletions
diff --git a/src/event/app.rs b/src/event/app.rs index 3d35f1a..5e9e79a 100644 --- a/src/event/app.rs +++ b/src/event/app.rs @@ -42,10 +42,10 @@ impl<'a> Events<'a> { .by_id(channel) .await .not_found(|| EventsError::ChannelNotFound(channel.clone()))?; - let sent_sequence = tx.sequence().next().await?; + let sent = tx.sequence().next(sent_at).await?; let event = tx .message_events() - .create(login, &channel, sent_at, sent_sequence, body) + .create(login, &channel, &sent, body) .await?; tx.commit().await?; @@ -62,10 +62,10 @@ impl<'a> Events<'a> { let mut events = Vec::with_capacity(expired.len()); for (channel, message) in expired { - let deleted_sequence = tx.sequence().next().await?; + let deleted = tx.sequence().next(relative_to).await?; let event = tx .message_events() - .delete(&channel, &message, relative_to, deleted_sequence) + .delete(&channel, &message, &deleted) .await?; events.push(event); } @@ -93,7 +93,9 @@ impl<'a> Events<'a> { let channel_events = channels .into_iter() .map(ChannelEvent::created) - .filter(move |event| resume_at.map_or(true, |resume_at| event.sequence > resume_at)); + .filter(move |event| { + resume_at.map_or(true, |resume_at| Sequence::from(event) > resume_at) + }); let message_events = tx.message_events().replay(resume_at).await?; @@ -101,8 +103,8 @@ impl<'a> Events<'a> { .into_iter() .chain(message_events.into_iter()) .collect::<Vec<_>>(); - replay_events.sort_by_key(|event| event.sequence); - let resume_live_at = replay_events.last().map(|event| event.sequence); + replay_events.sort_by_key(|event| Sequence::from(event)); + let resume_live_at = replay_events.last().map(Sequence::from); let replay = stream::iter(replay_events); @@ -124,7 +126,7 @@ impl<'a> Events<'a> { fn resume( resume_at: Option<Sequence>, ) -> impl for<'m> FnMut(&'m types::ChannelEvent) -> future::Ready<bool> { - move |event| future::ready(resume_at < Some(event.sequence)) + move |event| future::ready(resume_at < Some(Sequence::from(event))) } } |
