summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorOwen Jacobson <owen@grimoire.ca>2024-10-01 22:43:18 -0400
committerOwen Jacobson <owen@grimoire.ca>2024-10-01 23:14:49 -0400
commitd171a258ad2119e39cb715f8800031fff16967dc (patch)
tree453cf4c65fa18ff98ef13d9730f1a0f74ff68540
parentb8392a5fe824eff46f912a58885546e7b0f37e6f (diff)
Provide a resume point to bridge clients from state snapshots to the event sequence.
-rw-r--r--.sqlx/query-566ee1b8e4e66e78b28675a13fa4f719d82d465e4525557698914a661d39cdb4.json20
-rw-r--r--.sqlx/query-cda3a4a974eb986ebe26838143f6b5d16dbeb19ad13ac36dcb40851f0af238e8.json (renamed from .sqlx/query-7f6b9c7d4ef3f540d594318a7a66fa8f9e3ddcf6d041be8d834db58f66a5aa88.json)6
-rw-r--r--docs/api.md13
-rw-r--r--src/channel/app.rs6
-rw-r--r--src/channel/routes.rs15
-rw-r--r--src/channel/routes/test/list.rs7
-rw-r--r--src/channel/routes/test/on_create.rs2
-rw-r--r--src/events/routes.rs11
-rw-r--r--src/events/routes/test.rs67
-rw-r--r--src/login/app.rs9
-rw-r--r--src/login/routes.rs9
-rw-r--r--src/login/routes/test/boot.rs7
-rw-r--r--src/repo/channel.rs7
-rw-r--r--src/repo/sequence.rs52
14 files changed, 176 insertions, 55 deletions
diff --git a/.sqlx/query-566ee1b8e4e66e78b28675a13fa4f719d82d465e4525557698914a661d39cdb4.json b/.sqlx/query-566ee1b8e4e66e78b28675a13fa4f719d82d465e4525557698914a661d39cdb4.json
new file mode 100644
index 0000000..8d2fc72
--- /dev/null
+++ b/.sqlx/query-566ee1b8e4e66e78b28675a13fa4f719d82d465e4525557698914a661d39cdb4.json
@@ -0,0 +1,20 @@
+{
+ "db_name": "SQLite",
+ "query": "\n select last_value as \"last_value: Sequence\"\n from event_sequence\n ",
+ "describe": {
+ "columns": [
+ {
+ "name": "last_value: Sequence",
+ "ordinal": 0,
+ "type_info": "Integer"
+ }
+ ],
+ "parameters": {
+ "Right": 0
+ },
+ "nullable": [
+ false
+ ]
+ },
+ "hash": "566ee1b8e4e66e78b28675a13fa4f719d82d465e4525557698914a661d39cdb4"
+}
diff --git a/.sqlx/query-7f6b9c7d4ef3f540d594318a7a66fa8f9e3ddcf6d041be8d834db58f66a5aa88.json b/.sqlx/query-cda3a4a974eb986ebe26838143f6b5d16dbeb19ad13ac36dcb40851f0af238e8.json
index 3cc33cf..bce6a88 100644
--- a/.sqlx/query-7f6b9c7d4ef3f540d594318a7a66fa8f9e3ddcf6d041be8d834db58f66a5aa88.json
+++ b/.sqlx/query-cda3a4a974eb986ebe26838143f6b5d16dbeb19ad13ac36dcb40851f0af238e8.json
@@ -1,6 +1,6 @@
{
"db_name": "SQLite",
- "query": "\n select\n id as \"id: Id\",\n name,\n created_at as \"created_at: DateTime\",\n created_sequence as \"created_sequence: Sequence\"\n from channel\n order by channel.name\n ",
+ "query": "\n select\n id as \"id: Id\",\n name,\n created_at as \"created_at: DateTime\",\n created_sequence as \"created_sequence: Sequence\"\n from channel\n where coalesce(created_sequence <= $1, true)\n order by channel.name\n ",
"describe": {
"columns": [
{
@@ -25,7 +25,7 @@
}
],
"parameters": {
- "Right": 0
+ "Right": 1
},
"nullable": [
false,
@@ -34,5 +34,5 @@
false
]
},
- "hash": "7f6b9c7d4ef3f540d594318a7a66fa8f9e3ddcf6d041be8d834db58f66a5aa88"
+ "hash": "cda3a4a974eb986ebe26838143f6b5d16dbeb19ad13ac36dcb40851f0af238e8"
}
diff --git a/docs/api.md b/docs/api.md
index e18c6d5..5adf28d 100644
--- a/docs/api.md
+++ b/docs/api.md
@@ -23,7 +23,8 @@ Returns information needed to boot the client. Also the recommended way to check
"login": {
"name": "example username",
"id": "L1234abcd",
- }
+ },
+ "resume_point": "1312",
}
```
@@ -80,6 +81,10 @@ Channels are the containers for conversations. The API supports listing channels
Lists channels.
+#### Query parameters
+
+This endpoint accepts an optional `resume_point` query parameter. If provided, the value must be the value obtained from the `/api/boot` method. This parameter will restrict the returned list to channels as they existed at a fixed point in time, with any later changes only appearing in the event stream.
+
#### On success
Responds with a list of channel objects, one per channel:
@@ -152,9 +157,13 @@ Subscribes to events. This endpoint returns an `application/event-stream` respon
The returned stream may terminate, to limit the number of outstanding messages held by the server. Clients can and should repeat the request, using the `Last-Event-Id` header to resume from where they left off. Events will be replayed from that point, and the stream will resume.
+#### Query parameters
+
+This endpoint accepts an optional `resume_point` query parameter. If provided, the value must be the value obtained from the `/api/boot` method. This parameter start the returned stream immediately after the `resume_point`.
+
#### Request headers
-This endpoint accepts an optional `Last-Event-Id` header for resuming an interrupted stream. If this header is provided, it must be set to the `id` field sent with the last event the client has processed. When `Last-Event-Id` is sent, the response will resume immediately after the corresponding event. If this header is omitted, then the stream will start from the beginning.
+This endpoint accepts an optional `Last-Event-Id` header for resuming an interrupted stream. If this header is provided, it must be set to the `id` field sent with the last event the client has processed. When `Last-Event-Id` is sent, the response will resume immediately after the corresponding event. This header takes precedence over the `resume_point` query parameter; if neither is provided, then event playback starts at the beginning of time (_you have been warned_).
If you're using a browser's `EventSource` API, this is handled for you automatically.
diff --git a/src/channel/app.rs b/src/channel/app.rs
index 88f4170..d89e733 100644
--- a/src/channel/app.rs
+++ b/src/channel/app.rs
@@ -6,7 +6,7 @@ use crate::{
events::{broadcaster::Broadcaster, types::ChannelEvent},
repo::{
channel::{Channel, Provider as _},
- sequence::Provider as _,
+ sequence::{Provider as _, Sequence},
},
};
@@ -36,9 +36,9 @@ impl<'a> Channels<'a> {
Ok(channel)
}
- pub async fn all(&self) -> Result<Vec<Channel>, InternalError> {
+ pub async fn all(&self, resume_point: Option<Sequence>) -> Result<Vec<Channel>, InternalError> {
let mut tx = self.db.begin().await?;
- let channels = tx.channels().all().await?;
+ let channels = tx.channels().all(resume_point).await?;
tx.commit().await?;
Ok(channels)
diff --git a/src/channel/routes.rs b/src/channel/routes.rs
index 1f8db5a..067d213 100644
--- a/src/channel/routes.rs
+++ b/src/channel/routes.rs
@@ -5,6 +5,7 @@ use axum::{
routing::{get, post},
Router,
};
+use axum_extra::extract::Query;
use super::app;
use crate::{
@@ -15,6 +16,7 @@ use crate::{
repo::{
channel::{self, Channel},
login::Login,
+ sequence::Sequence,
},
};
@@ -28,8 +30,17 @@ pub fn router() -> Router<App> {
.route("/api/channels/:channel", post(on_send))
}
-async fn list(State(app): State<App>, _: Login) -> Result<Channels, Internal> {
- let channels = app.channels().all().await?;
+#[derive(Default, serde::Deserialize)]
+struct ListQuery {
+ resume_point: Option<Sequence>,
+}
+
+async fn list(
+ State(app): State<App>,
+ _: Login,
+ Query(query): Query<ListQuery>,
+) -> Result<Channels, Internal> {
+ let channels = app.channels().all(query.resume_point).await?;
let response = Channels(channels);
Ok(response)
diff --git a/src/channel/routes/test/list.rs b/src/channel/routes/test/list.rs
index bc94024..f15a53c 100644
--- a/src/channel/routes/test/list.rs
+++ b/src/channel/routes/test/list.rs
@@ -1,4 +1,5 @@
use axum::extract::State;
+use axum_extra::extract::Query;
use crate::{channel::routes, test::fixtures};
@@ -11,7 +12,7 @@ async fn empty_list() {
// Call the endpoint
- let routes::Channels(channels) = routes::list(State(app), viewer)
+ let routes::Channels(channels) = routes::list(State(app), viewer, Query::default())
.await
.expect("always succeeds");
@@ -30,7 +31,7 @@ async fn one_channel() {
// Call the endpoint
- let routes::Channels(channels) = routes::list(State(app), viewer)
+ let routes::Channels(channels) = routes::list(State(app), viewer, Query::default())
.await
.expect("always succeeds");
@@ -52,7 +53,7 @@ async fn multiple_channels() {
// Call the endpoint
- let routes::Channels(response_channels) = routes::list(State(app), viewer)
+ let routes::Channels(response_channels) = routes::list(State(app), viewer, Query::default())
.await
.expect("always succeeds");
diff --git a/src/channel/routes/test/on_create.rs b/src/channel/routes/test/on_create.rs
index 5deb88a..72980ac 100644
--- a/src/channel/routes/test/on_create.rs
+++ b/src/channel/routes/test/on_create.rs
@@ -33,7 +33,7 @@ async fn new_channel() {
// Verify the semantics
- let channels = app.channels().all().await.expect("always succeeds");
+ let channels = app.channels().all(None).await.expect("always succeeds");
assert!(channels.contains(&response_channel));
let mut events = app
diff --git a/src/events/routes.rs b/src/events/routes.rs
index e3a959f..d81c7fb 100644
--- a/src/events/routes.rs
+++ b/src/events/routes.rs
@@ -7,6 +7,7 @@ use axum::{
routing::get,
Router,
};
+use axum_extra::extract::Query;
use futures::stream::{Stream, StreamExt as _};
use super::{extract::LastEventId, types};
@@ -24,12 +25,20 @@ pub fn router() -> Router<App> {
Router::new().route("/api/events", get(events))
}
+#[derive(Default, serde::Deserialize)]
+struct EventsQuery {
+ resume_point: Option<Sequence>,
+}
+
async fn events(
State(app): State<App>,
identity: Identity,
last_event_id: Option<LastEventId<Sequence>>,
+ Query(query): Query<EventsQuery>,
) -> Result<Events<impl Stream<Item = types::ChannelEvent> + std::fmt::Debug>, EventsError> {
- let resume_at = last_event_id.map(LastEventId::into_inner);
+ let resume_at = last_event_id
+ .map(LastEventId::into_inner)
+ .or(query.resume_point);
let stream = app.events().subscribe(resume_at).await?;
let stream = app.logins().limit_stream(identity.token, stream).await?;
diff --git a/src/events/routes/test.rs b/src/events/routes/test.rs
index 1cfca4f..11f01b8 100644
--- a/src/events/routes/test.rs
+++ b/src/events/routes/test.rs
@@ -1,4 +1,5 @@
use axum::extract::State;
+use axum_extra::extract::Query;
use futures::{
future,
stream::{self, StreamExt as _},
@@ -22,7 +23,7 @@ async fn includes_historical_message() {
let subscriber_creds = fixtures::login::create_with_password(&app).await;
let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await;
- let routes::Events(events) = routes::events(State(app), subscriber, None)
+ let routes::Events(events) = routes::events(State(app), subscriber, None, Query::default())
.await
.expect("subscribe never fails");
@@ -49,9 +50,10 @@ async fn includes_live_message() {
let subscriber_creds = fixtures::login::create_with_password(&app).await;
let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await;
- let routes::Events(events) = routes::events(State(app.clone()), subscriber, None)
- .await
- .expect("subscribe never fails");
+ let routes::Events(events) =
+ routes::events(State(app.clone()), subscriber, None, Query::default())
+ .await
+ .expect("subscribe never fails");
// Verify the semantics
@@ -94,7 +96,7 @@ async fn includes_multiple_channels() {
let subscriber_creds = fixtures::login::create_with_password(&app).await;
let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await;
- let routes::Events(events) = routes::events(State(app), subscriber, None)
+ let routes::Events(events) = routes::events(State(app), subscriber, None, Query::default())
.await
.expect("subscribe never fails");
@@ -130,7 +132,7 @@ async fn sequential_messages() {
let subscriber_creds = fixtures::login::create_with_password(&app).await;
let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await;
- let routes::Events(events) = routes::events(State(app), subscriber, None)
+ let routes::Events(events) = routes::events(State(app), subscriber, None, Query::default())
.await
.expect("subscribe never fails");
@@ -172,9 +174,14 @@ async fn resumes_from() {
let resume_at = {
// First subscription
- let routes::Events(events) = routes::events(State(app.clone()), subscriber.clone(), None)
- .await
- .expect("subscribe never fails");
+ let routes::Events(events) = routes::events(
+ State(app.clone()),
+ subscriber.clone(),
+ None,
+ Query::default(),
+ )
+ .await
+ .expect("subscribe never fails");
let event = events
.filter(fixtures::filter::messages())
@@ -189,9 +196,14 @@ async fn resumes_from() {
};
// Resume after disconnect
- let routes::Events(resumed) = routes::events(State(app), subscriber, Some(resume_at.into()))
- .await
- .expect("subscribe never fails");
+ let routes::Events(resumed) = routes::events(
+ State(app),
+ subscriber,
+ Some(resume_at.into()),
+ Query::default(),
+ )
+ .await
+ .expect("subscribe never fails");
// Verify the structure of the response.
@@ -242,9 +254,14 @@ async fn serial_resume() {
];
// First subscription
- let routes::Events(events) = routes::events(State(app.clone()), subscriber.clone(), None)
- .await
- .expect("subscribe never fails");
+ let routes::Events(events) = routes::events(
+ State(app.clone()),
+ subscriber.clone(),
+ None,
+ Query::default(),
+ )
+ .await
+ .expect("subscribe never fails");
let events = events
.filter(fixtures::filter::messages())
@@ -277,6 +294,7 @@ async fn serial_resume() {
State(app.clone()),
subscriber.clone(),
Some(resume_at.into()),
+ Query::default(),
)
.await
.expect("subscribe never fails");
@@ -312,6 +330,7 @@ async fn serial_resume() {
State(app.clone()),
subscriber.clone(),
Some(resume_at.into()),
+ Query::default(),
)
.await
.expect("subscribe never fails");
@@ -345,9 +364,10 @@ async fn terminates_on_token_expiry() {
let subscriber =
fixtures::identity::identity(&app, &subscriber_creds, &fixtures::ancient()).await;
- let routes::Events(events) = routes::events(State(app.clone()), subscriber, None)
- .await
- .expect("subscribe never fails");
+ let routes::Events(events) =
+ routes::events(State(app.clone()), subscriber, None, Query::default())
+ .await
+ .expect("subscribe never fails");
// Verify the resulting stream's behaviour
@@ -387,9 +407,14 @@ async fn terminates_on_logout() {
let subscriber =
fixtures::identity::from_token(&app, &subscriber_token, &fixtures::now()).await;
- let routes::Events(events) = routes::events(State(app.clone()), subscriber.clone(), None)
- .await
- .expect("subscribe never fails");
+ let routes::Events(events) = routes::events(
+ State(app.clone()),
+ subscriber.clone(),
+ None,
+ Query::default(),
+ )
+ .await
+ .expect("subscribe never fails");
// Verify the resulting stream's behaviour
diff --git a/src/login/app.rs b/src/login/app.rs
index 95f0a07..f1dffb9 100644
--- a/src/login/app.rs
+++ b/src/login/app.rs
@@ -13,6 +13,7 @@ use crate::{
repo::{
error::NotFound as _,
login::{Login, Provider as _},
+ sequence::{Provider as _, Sequence},
token::{self, Provider as _},
},
};
@@ -27,6 +28,14 @@ impl<'a> Logins<'a> {
Self { db, logins }
}
+ pub async fn boot_point(&self) -> Result<Sequence, sqlx::Error> {
+ let mut tx = self.db.begin().await?;
+ let sequence = tx.sequence().current().await?;
+ tx.commit().await?;
+
+ Ok(sequence)
+ }
+
pub async fn login(
&self,
name: &str,
diff --git a/src/login/routes.rs b/src/login/routes.rs
index d7cb9b1..ef75871 100644
--- a/src/login/routes.rs
+++ b/src/login/routes.rs
@@ -26,13 +26,18 @@ pub fn router() -> Router<App> {
.route("/api/auth/logout", post(on_logout))
}
-async fn boot(login: Login) -> Boot {
- Boot { login }
+async fn boot(State(app): State<App>, login: Login) -> Result<Boot, Internal> {
+ let resume_point = app.logins().boot_point().await?;
+ Ok(Boot {
+ login,
+ resume_point: resume_point.to_string(),
+ })
}
#[derive(serde::Serialize)]
struct Boot {
login: Login,
+ resume_point: String,
}
impl IntoResponse for Boot {
diff --git a/src/login/routes/test/boot.rs b/src/login/routes/test/boot.rs
index dee554f..9655354 100644
--- a/src/login/routes/test/boot.rs
+++ b/src/login/routes/test/boot.rs
@@ -1,9 +1,14 @@
+use axum::extract::State;
+
use crate::{login::routes, test::fixtures};
#[tokio::test]
async fn returns_identity() {
+ let app = fixtures::scratch_app().await;
let login = fixtures::login::fictitious();
- let response = routes::boot(login.clone()).await;
+ let response = routes::boot(State(app), login.clone())
+ .await
+ .expect("boot always succeeds");
assert_eq!(login, response.login);
}
diff --git a/src/repo/channel.rs b/src/repo/channel.rs
index efc2ced..ad42710 100644
--- a/src/repo/channel.rs
+++ b/src/repo/channel.rs
@@ -82,7 +82,10 @@ impl<'c> Channels<'c> {
Ok(channel)
}
- pub async fn all(&mut self) -> Result<Vec<Channel>, sqlx::Error> {
+ pub async fn all(
+ &mut self,
+ resume_point: Option<Sequence>,
+ ) -> Result<Vec<Channel>, sqlx::Error> {
let channels = sqlx::query_as!(
Channel,
r#"
@@ -92,8 +95,10 @@ impl<'c> Channels<'c> {
created_at as "created_at: DateTime",
created_sequence as "created_sequence: Sequence"
from channel
+ where coalesce(created_sequence <= $1, true)
order by channel.name
"#,
+ resume_point,
)
.fetch_all(&mut *self.0)
.await?;
diff --git a/src/repo/sequence.rs b/src/repo/sequence.rs
index 8fe9dab..c47b41c 100644
--- a/src/repo/sequence.rs
+++ b/src/repo/sequence.rs
@@ -1,3 +1,5 @@
+use std::fmt;
+
use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction};
pub trait Provider {
@@ -10,6 +12,37 @@ impl<'c> Provider for Transaction<'c, Sqlite> {
}
}
+pub struct Sequences<'t>(&'t mut SqliteConnection);
+
+impl<'c> Sequences<'c> {
+ pub async fn next(&mut self) -> Result<Sequence, sqlx::Error> {
+ let next = sqlx::query_scalar!(
+ r#"
+ update event_sequence
+ set last_value = last_value + 1
+ returning last_value as "next_value: Sequence"
+ "#,
+ )
+ .fetch_one(&mut *self.0)
+ .await?;
+
+ Ok(next)
+ }
+
+ pub async fn current(&mut self) -> Result<Sequence, sqlx::Error> {
+ let next = sqlx::query_scalar!(
+ r#"
+ select last_value as "last_value: Sequence"
+ from event_sequence
+ "#,
+ )
+ .fetch_one(&mut *self.0)
+ .await?;
+
+ Ok(next)
+ }
+}
+
#[derive(
Clone,
Copy,
@@ -26,20 +59,9 @@ impl<'c> Provider for Transaction<'c, Sqlite> {
#[sqlx(transparent)]
pub struct Sequence(i64);
-pub struct Sequences<'t>(&'t mut SqliteConnection);
-
-impl<'c> Sequences<'c> {
- pub async fn next(&mut self) -> Result<Sequence, sqlx::Error> {
- let next = sqlx::query_scalar!(
- r#"
- update event_sequence
- set last_value = last_value + 1
- returning last_value as "next_value: Sequence"
- "#,
- )
- .fetch_one(&mut *self.0)
- .await?;
-
- Ok(next)
+impl fmt::Display for Sequence {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ let Self(value) = self;
+ value.fmt(f)
}
}