summaryrefslogtreecommitdiff
path: root/src/channel
diff options
context:
space:
mode:
authorOwen Jacobson <owen@grimoire.ca>2024-10-01 22:43:18 -0400
committerOwen Jacobson <owen@grimoire.ca>2024-10-01 23:14:49 -0400
commitd171a258ad2119e39cb715f8800031fff16967dc (patch)
tree453cf4c65fa18ff98ef13d9730f1a0f74ff68540 /src/channel
parentb8392a5fe824eff46f912a58885546e7b0f37e6f (diff)
Provide a resume point to bridge clients from state snapshots to the event sequence.
Diffstat (limited to 'src/channel')
-rw-r--r--src/channel/app.rs6
-rw-r--r--src/channel/routes.rs15
-rw-r--r--src/channel/routes/test/list.rs7
-rw-r--r--src/channel/routes/test/on_create.rs2
4 files changed, 21 insertions, 9 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs
index 88f4170..d89e733 100644
--- a/src/channel/app.rs
+++ b/src/channel/app.rs
@@ -6,7 +6,7 @@ use crate::{
events::{broadcaster::Broadcaster, types::ChannelEvent},
repo::{
channel::{Channel, Provider as _},
- sequence::Provider as _,
+ sequence::{Provider as _, Sequence},
},
};
@@ -36,9 +36,9 @@ impl<'a> Channels<'a> {
Ok(channel)
}
- pub async fn all(&self) -> Result<Vec<Channel>, InternalError> {
+ pub async fn all(&self, resume_point: Option<Sequence>) -> Result<Vec<Channel>, InternalError> {
let mut tx = self.db.begin().await?;
- let channels = tx.channels().all().await?;
+ let channels = tx.channels().all(resume_point).await?;
tx.commit().await?;
Ok(channels)
diff --git a/src/channel/routes.rs b/src/channel/routes.rs
index 1f8db5a..067d213 100644
--- a/src/channel/routes.rs
+++ b/src/channel/routes.rs
@@ -5,6 +5,7 @@ use axum::{
routing::{get, post},
Router,
};
+use axum_extra::extract::Query;
use super::app;
use crate::{
@@ -15,6 +16,7 @@ use crate::{
repo::{
channel::{self, Channel},
login::Login,
+ sequence::Sequence,
},
};
@@ -28,8 +30,17 @@ pub fn router() -> Router<App> {
.route("/api/channels/:channel", post(on_send))
}
-async fn list(State(app): State<App>, _: Login) -> Result<Channels, Internal> {
- let channels = app.channels().all().await?;
+#[derive(Default, serde::Deserialize)]
+struct ListQuery {
+ resume_point: Option<Sequence>,
+}
+
+async fn list(
+ State(app): State<App>,
+ _: Login,
+ Query(query): Query<ListQuery>,
+) -> Result<Channels, Internal> {
+ let channels = app.channels().all(query.resume_point).await?;
let response = Channels(channels);
Ok(response)
diff --git a/src/channel/routes/test/list.rs b/src/channel/routes/test/list.rs
index bc94024..f15a53c 100644
--- a/src/channel/routes/test/list.rs
+++ b/src/channel/routes/test/list.rs
@@ -1,4 +1,5 @@
use axum::extract::State;
+use axum_extra::extract::Query;
use crate::{channel::routes, test::fixtures};
@@ -11,7 +12,7 @@ async fn empty_list() {
// Call the endpoint
- let routes::Channels(channels) = routes::list(State(app), viewer)
+ let routes::Channels(channels) = routes::list(State(app), viewer, Query::default())
.await
.expect("always succeeds");
@@ -30,7 +31,7 @@ async fn one_channel() {
// Call the endpoint
- let routes::Channels(channels) = routes::list(State(app), viewer)
+ let routes::Channels(channels) = routes::list(State(app), viewer, Query::default())
.await
.expect("always succeeds");
@@ -52,7 +53,7 @@ async fn multiple_channels() {
// Call the endpoint
- let routes::Channels(response_channels) = routes::list(State(app), viewer)
+ let routes::Channels(response_channels) = routes::list(State(app), viewer, Query::default())
.await
.expect("always succeeds");
diff --git a/src/channel/routes/test/on_create.rs b/src/channel/routes/test/on_create.rs
index 5deb88a..72980ac 100644
--- a/src/channel/routes/test/on_create.rs
+++ b/src/channel/routes/test/on_create.rs
@@ -33,7 +33,7 @@ async fn new_channel() {
// Verify the semantics
- let channels = app.channels().all().await.expect("always succeeds");
+ let channels = app.channels().all(None).await.expect("always succeeds");
assert!(channels.contains(&response_channel));
let mut events = app