summaryrefslogtreecommitdiff
path: root/src/channel/app.rs
diff options
context:
space:
mode:
authorOwen Jacobson <owen@grimoire.ca>2024-09-13 22:30:02 -0400
committerOwen Jacobson <owen@grimoire.ca>2024-09-13 23:12:31 -0400
commit407ca8df6284ce1a4c649b018c7326fd195bbd26 (patch)
tree876091c17efbd765a4c7ef339548c0ff4dfb96d5 /src/channel/app.rs
parent388a3d5a925aef7ff39339454ae0d720e05f038e (diff)
Support Last-Event-Id as a method of resuming channel events after a disconnect
Diffstat (limited to 'src/channel/app.rs')
-rw-r--r--src/channel/app.rs30
1 files changed, 25 insertions, 5 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs
index e242c2f..c0a6d60 100644
--- a/src/channel/app.rs
+++ b/src/channel/app.rs
@@ -2,8 +2,9 @@ use std::collections::{hash_map::Entry, HashMap};
use std::sync::{Arc, Mutex, MutexGuard};
use futures::{
+ future,
stream::{self, StreamExt as _, TryStreamExt as _},
- Stream,
+ TryStream,
};
use sqlx::sqlite::SqlitePool;
use tokio::sync::broadcast::{channel, Sender};
@@ -55,14 +56,33 @@ impl<'a> Channels<'a> {
pub async fn events(
&self,
channel: &ChannelId,
- ) -> Result<impl Stream<Item = Result<BroadcastMessage, BoxedError>>, BoxedError> {
- let live_messages = self.broadcaster.listen(channel).map_err(BoxedError::from);
+ resume_at: Option<&DateTime>,
+ ) -> Result<impl TryStream<Ok = BroadcastMessage, Error = BoxedError>, BoxedError> {
+ fn skip_stale<E>(
+ resume_at: Option<&DateTime>,
+ ) -> impl for<'m> FnMut(&'m BroadcastMessage) -> future::Ready<Result<bool, E>> {
+ let resume_at = resume_at.cloned();
+ move |msg| {
+ future::ready(Ok(match resume_at {
+ None => false,
+ Some(resume_at) => msg.sent_at <= resume_at,
+ }))
+ }
+ }
+
+ let live_messages = self
+ .broadcaster
+ .listen(channel)
+ .map_err(BoxedError::from)
+ .try_skip_while(skip_stale(resume_at));
let mut tx = self.db.begin().await?;
- let stored_messages = tx.messages().for_replay(channel).await?;
+ let stored_messages = tx.messages().for_replay(channel, resume_at).await?;
tx.commit().await?;
- Ok(stream::iter(stored_messages).map(Ok).chain(live_messages))
+ let stored_messages = stream::iter(stored_messages).map(Ok);
+
+ Ok(stored_messages.chain(live_messages))
}
}