diff options
Diffstat (limited to 'src/channel/app.rs')
| -rw-r--r-- | src/channel/app.rs | 30 |
1 files changed, 25 insertions, 5 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs index e242c2f..c0a6d60 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -2,8 +2,9 @@ use std::collections::{hash_map::Entry, HashMap}; use std::sync::{Arc, Mutex, MutexGuard}; use futures::{ + future, stream::{self, StreamExt as _, TryStreamExt as _}, - Stream, + TryStream, }; use sqlx::sqlite::SqlitePool; use tokio::sync::broadcast::{channel, Sender}; @@ -55,14 +56,33 @@ impl<'a> Channels<'a> { pub async fn events( &self, channel: &ChannelId, - ) -> Result<impl Stream<Item = Result<BroadcastMessage, BoxedError>>, BoxedError> { - let live_messages = self.broadcaster.listen(channel).map_err(BoxedError::from); + resume_at: Option<&DateTime>, + ) -> Result<impl TryStream<Ok = BroadcastMessage, Error = BoxedError>, BoxedError> { + fn skip_stale<E>( + resume_at: Option<&DateTime>, + ) -> impl for<'m> FnMut(&'m BroadcastMessage) -> future::Ready<Result<bool, E>> { + let resume_at = resume_at.cloned(); + move |msg| { + future::ready(Ok(match resume_at { + None => false, + Some(resume_at) => msg.sent_at <= resume_at, + })) + } + } + + let live_messages = self + .broadcaster + .listen(channel) + .map_err(BoxedError::from) + .try_skip_while(skip_stale(resume_at)); let mut tx = self.db.begin().await?; - let stored_messages = tx.messages().for_replay(channel).await?; + let stored_messages = tx.messages().for_replay(channel, resume_at).await?; tx.commit().await?; - Ok(stream::iter(stored_messages).map(Ok).chain(live_messages)) + let stored_messages = stream::iter(stored_messages).map(Ok); + + Ok(stored_messages.chain(live_messages)) } } |
