summaryrefslogtreecommitdiff
path: root/src/push
diff options
context:
space:
mode:
authorOwen Jacobson <owen@grimoire.ca>2025-10-24 19:10:29 -0400
committerOwen Jacobson <owen@grimoire.ca>2025-11-06 18:59:15 -0500
commit1f44cd930cdff94bb8cf04f645a5b035507438d9 (patch)
tree0f572a68498c156df4b3a8b0d669c3786c38888d /src/push
parente2a851f68aacd74a248e925ab334c3cf9eabba18 (diff)
Add an endpoint for creating push subscriptions.
The semantics of this endpoint are somewhat complex, and are incompletely captured in the associated docs change. For posterity, the intended workflow is: 1. Obtain Pilcrow's current VAPID key by connecting (it's in the events, either from boot or from the event stream). 2. Use the browser push APIs to create a push subscription, using that VAPID key. 3. Send Pilcrow the push subscription endpoint and keys, plus the VAPID key the client used to create it so that the server can detect race conditions with key rotation. 4. Wait for messages to arrive. This commit does not introduce any actual messages, just subscription management endpoints. When the server's VAPID key is rotated, all existing subscriptions are discarded. Without the VAPID key, the server cannot service those subscriptions. We can't exactly notify the broker to stop processing messages on those subscriptions, so this is an incomplete solution to what to do if the key is being rotated due to a compromise, but it's better than nothing. The shape of the API endpoint is heavily informed by the [JSON payload][web-push-json] provided by browser Web Push implementations, to ease client development in a browser-based context. The idea is that a client can take that JSON and send it to the server verbatim, without needing to transform it in any way, to submit the subscription to the server for use. [web-push-json]: https://developer.mozilla.org/en-US/docs/Web/API/PushSubscription/toJSON Push subscriptions are operationally associated with a specific _user agent_, and have no inherent relationship with a Pilcrow login or token (session). Taken as-is, a subscription created by user A could be reused by user B if they share a user agent, even if user A logs out before user B logs in. Pilcrow therefore _logically_ associates push subscriptions with specific tokens, and abandons those subscriptions when the token is invalidated by * logging out, * expiry, or * changing passwords. (There are no other token invalidation workflows at this time.) Stored subscriptions are also abandoned when the server's VAPID key changes.
Diffstat (limited to 'src/push')
-rw-r--r--src/push/app.rs76
-rw-r--r--src/push/handlers/mod.rs3
-rw-r--r--src/push/handlers/subscribe/mod.rs95
-rw-r--r--src/push/handlers/subscribe/test.rs236
-rw-r--r--src/push/mod.rs3
-rw-r--r--src/push/repo.rs114
6 files changed, 527 insertions, 0 deletions
diff --git a/src/push/app.rs b/src/push/app.rs
new file mode 100644
index 0000000..358a8cc
--- /dev/null
+++ b/src/push/app.rs
@@ -0,0 +1,76 @@
+use p256::ecdsa::VerifyingKey;
+use sqlx::SqlitePool;
+use web_push::SubscriptionInfo;
+
+use super::repo::Provider as _;
+use crate::{token::extract::Identity, vapid, vapid::repo::Provider as _};
+
+pub struct Push {
+ db: SqlitePool,
+}
+
+impl Push {
+ pub const fn new(db: SqlitePool) -> Self {
+ Self { db }
+ }
+
+ pub async fn subscribe(
+ &self,
+ subscriber: &Identity,
+ subscription: &SubscriptionInfo,
+ vapid: &VerifyingKey,
+ ) -> Result<(), SubscribeError> {
+ let mut tx = self.db.begin().await?;
+
+ let current = tx.vapid().current().await?;
+ if vapid != &current.key {
+ return Err(SubscribeError::StaleVapidKey(current.key));
+ }
+
+ match tx.push().create(&subscriber.token, subscription).await {
+ Ok(()) => (),
+ Err(err) => {
+ if let Some(err) = err.as_database_error()
+ && err.is_unique_violation()
+ {
+ let current = tx
+ .push()
+ .by_endpoint(&subscriber.login, &subscription.endpoint)
+ .await?;
+ // If we already have a subscription for this endpoint, with _different_
+ // parameters, then this is a client error. They shouldn't reuse endpoint URLs,
+ // per the various RFCs.
+ //
+ // However, if we have a subscription for this endpoint with the same parameters
+ // then we accept it and silently do nothing. This may happen if, for example,
+ // the subscribe request is retried due to a network interruption where it's
+ // not clear whether the original request succeeded.
+ if &current != subscription {
+ return Err(SubscribeError::Duplicate);
+ }
+ } else {
+ return Err(SubscribeError::Database(err));
+ }
+ }
+ }
+
+ tx.commit().await?;
+
+ Ok(())
+ }
+}
+
+#[derive(Debug, thiserror::Error)]
+pub enum SubscribeError {
+ #[error(transparent)]
+ Database(#[from] sqlx::Error),
+ #[error(transparent)]
+ Vapid(#[from] vapid::repo::Error),
+ #[error("subscription created with stale VAPID key")]
+ StaleVapidKey(VerifyingKey),
+ #[error("subscription already exists for endpoint")]
+ // The endpoint URL is not included in the error, as it is a bearer credential in its own right
+ // and we want to limit its proliferation. The only intended recipient of this message is the
+ // client, which already knows the endpoint anyways and doesn't need us to tell them.
+ Duplicate,
+}
diff --git a/src/push/handlers/mod.rs b/src/push/handlers/mod.rs
new file mode 100644
index 0000000..86eeea0
--- /dev/null
+++ b/src/push/handlers/mod.rs
@@ -0,0 +1,3 @@
+mod subscribe;
+
+pub use subscribe::handler as subscribe;
diff --git a/src/push/handlers/subscribe/mod.rs b/src/push/handlers/subscribe/mod.rs
new file mode 100644
index 0000000..d142df6
--- /dev/null
+++ b/src/push/handlers/subscribe/mod.rs
@@ -0,0 +1,95 @@
+use axum::{
+ extract::{Json, State},
+ http::StatusCode,
+ response::{IntoResponse, Response},
+};
+use p256::ecdsa::VerifyingKey;
+use web_push::SubscriptionInfo;
+
+use crate::{
+ error::Internal,
+ push::{app, app::Push},
+ token::extract::Identity,
+};
+
+#[cfg(test)]
+mod test;
+
+#[derive(Clone, serde::Deserialize)]
+pub struct Request {
+ subscription: Subscription,
+ #[serde(with = "crate::vapid::ser::key")]
+ vapid: VerifyingKey,
+}
+
+// This structure is described in <https://w3c.github.io/push-api/#dom-pushsubscription-tojson>.
+#[derive(Clone, serde::Deserialize)]
+pub struct Subscription {
+ endpoint: String,
+ keys: Keys,
+}
+
+// This structure is described in <https://w3c.github.io/push-api/#dom-pushsubscription-tojson>.
+#[derive(Clone, serde::Deserialize)]
+pub struct Keys {
+ p256dh: String,
+ auth: String,
+}
+
+pub async fn handler(
+ State(push): State<Push>,
+ identity: Identity,
+ Json(request): Json<Request>,
+) -> Result<StatusCode, Error> {
+ let Request {
+ subscription,
+ vapid,
+ } = request;
+
+ push.subscribe(&identity, &subscription.into(), &vapid)
+ .await?;
+
+ Ok(StatusCode::CREATED)
+}
+
+impl From<Subscription> for SubscriptionInfo {
+ fn from(request: Subscription) -> Self {
+ let Subscription {
+ endpoint,
+ keys: Keys { p256dh, auth },
+ } = request;
+ let info = SubscriptionInfo::new(endpoint, p256dh, auth);
+ info
+ }
+}
+
+#[derive(Debug, thiserror::Error)]
+#[error(transparent)]
+pub struct Error(#[from] app::SubscribeError);
+
+impl IntoResponse for Error {
+ fn into_response(self) -> Response {
+ let Self(err) = self;
+
+ match err {
+ app::SubscribeError::StaleVapidKey(key) => {
+ let body = StaleVapidKey {
+ message: err.to_string(),
+ key,
+ };
+ (StatusCode::BAD_REQUEST, Json(body)).into_response()
+ }
+ app::SubscribeError::Duplicate => {
+ (StatusCode::CONFLICT, err.to_string()).into_response()
+ }
+ other => Internal::from(other).into_response(),
+ }
+ }
+}
+
+#[derive(serde::Serialize)]
+struct StaleVapidKey {
+ message: String,
+ #[serde(with = "crate::vapid::ser::key")]
+ key: VerifyingKey,
+}
diff --git a/src/push/handlers/subscribe/test.rs b/src/push/handlers/subscribe/test.rs
new file mode 100644
index 0000000..b72624d
--- /dev/null
+++ b/src/push/handlers/subscribe/test.rs
@@ -0,0 +1,236 @@
+use axum::{
+ extract::{Json, State},
+ http::StatusCode,
+};
+
+use crate::{
+ push::app::SubscribeError,
+ test::{fixtures, fixtures::event},
+};
+
+#[tokio::test]
+async fn accepts_new_subscription() {
+ let app = fixtures::scratch_app().await;
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+
+ // Issue a VAPID key.
+
+ app.vapid()
+ .refresh_key(&fixtures::now())
+ .await
+ .expect("refreshing the VAPID key always succeeds");
+
+ // Find out what that VAPID key is.
+
+ let boot = app.boot().snapshot().await.expect("boot always succeeds");
+ let vapid = boot
+ .events
+ .into_iter()
+ .filter_map(event::vapid)
+ .filter_map(event::vapid::changed)
+ .next_back()
+ .expect("the application will have a vapid key after a refresh");
+
+ // Create a dummy subscription with that key.
+
+ let request = super::Request {
+ subscription: super::Subscription {
+ endpoint: String::from("https://push.example.com/endpoint"),
+ keys: super::Keys {
+ p256dh: String::from("test-p256dh-value"),
+ auth: String::from("test-auth-value"),
+ },
+ },
+ vapid: vapid.key,
+ };
+ let response = super::handler(State(app.push()), subscriber, Json(request))
+ .await
+ .expect("test request will succeed on a fresh app");
+
+ // Check that the response looks as expected.
+
+ assert_eq!(StatusCode::CREATED, response);
+}
+
+#[tokio::test]
+async fn accepts_repeat_subscription() {
+ let app = fixtures::scratch_app().await;
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+
+ // Issue a VAPID key.
+
+ app.vapid()
+ .refresh_key(&fixtures::now())
+ .await
+ .expect("refreshing the VAPID key always succeeds");
+
+ // Find out what that VAPID key is.
+
+ let boot = app.boot().snapshot().await.expect("boot always succeeds");
+ let vapid = boot
+ .events
+ .into_iter()
+ .filter_map(event::vapid)
+ .filter_map(event::vapid::changed)
+ .next_back()
+ .expect("the application will have a vapid key after a refresh");
+
+ // Create a dummy subscription with that key.
+
+ let request = super::Request {
+ subscription: super::Subscription {
+ endpoint: String::from("https://push.example.com/endpoint"),
+ keys: super::Keys {
+ p256dh: String::from("test-p256dh-value"),
+ auth: String::from("test-auth-value"),
+ },
+ },
+ vapid: vapid.key,
+ };
+ let response = super::handler(State(app.push()), subscriber.clone(), Json(request.clone()))
+ .await
+ .expect("test request will succeed on a fresh app");
+
+ // Check that the response looks as expected.
+
+ assert_eq!(StatusCode::CREATED, response);
+
+ // Repeat the request
+
+ let response = super::handler(State(app.push()), subscriber, Json(request))
+ .await
+ .expect("test request will succeed twice on a fresh app");
+
+ // Check that the second response also looks as expected.
+
+ assert_eq!(StatusCode::CREATED, response);
+}
+
+#[tokio::test]
+async fn rejects_duplicate_subscription() {
+ let app = fixtures::scratch_app().await;
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+
+ // Issue a VAPID key.
+
+ app.vapid()
+ .refresh_key(&fixtures::now())
+ .await
+ .expect("refreshing the VAPID key always succeeds");
+
+ // Find out what that VAPID key is.
+
+ let boot = app.boot().snapshot().await.expect("boot always succeeds");
+ let vapid = boot
+ .events
+ .into_iter()
+ .filter_map(event::vapid)
+ .filter_map(event::vapid::changed)
+ .next_back()
+ .expect("the application will have a vapid key after a refresh");
+
+ // Create a dummy subscription with that key.
+
+ let request = super::Request {
+ subscription: super::Subscription {
+ endpoint: String::from("https://push.example.com/endpoint"),
+ keys: super::Keys {
+ p256dh: String::from("test-p256dh-value"),
+ auth: String::from("test-auth-value"),
+ },
+ },
+ vapid: vapid.key,
+ };
+ super::handler(State(app.push()), subscriber.clone(), Json(request))
+ .await
+ .expect("test request will succeed on a fresh app");
+
+ // Repeat the request with different keys
+
+ let request = super::Request {
+ subscription: super::Subscription {
+ endpoint: String::from("https://push.example.com/endpoint"),
+ keys: super::Keys {
+ p256dh: String::from("different-test-p256dh-value"),
+ auth: String::from("different-test-auth-value"),
+ },
+ },
+ vapid: vapid.key,
+ };
+ let response = super::handler(State(app.push()), subscriber, Json(request))
+ .await
+ .expect_err("request with duplicate endpoint should fail");
+
+ // Make sure we got the error we expected.
+
+ assert!(matches!(response, super::Error(SubscribeError::Duplicate)));
+}
+
+#[tokio::test]
+async fn rejects_stale_vapid_key() {
+ let app = fixtures::scratch_app().await;
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+
+ // Issue a VAPID key.
+
+ app.vapid()
+ .refresh_key(&fixtures::now())
+ .await
+ .expect("refreshing the VAPID key always succeeds");
+
+ // Find out what that VAPID key is.
+
+ let boot = app.boot().snapshot().await.expect("boot always succeeds");
+ let vapid = boot
+ .events
+ .into_iter()
+ .filter_map(event::vapid)
+ .filter_map(event::vapid::changed)
+ .next_back()
+ .expect("the application will have a vapid key after a refresh");
+
+ // Change the VAPID key.
+
+ app.vapid()
+ .rotate_key()
+ .await
+ .expect("key rotation always succeeds");
+ app.vapid()
+ .refresh_key(&fixtures::now())
+ .await
+ .expect("refreshing the VAPID key always succeeds");
+
+ // Find out what the new VAPID key is.
+
+ let boot = app.boot().snapshot().await.expect("boot always succeeds");
+ let fresh_vapid = boot
+ .events
+ .into_iter()
+ .filter_map(event::vapid)
+ .filter_map(event::vapid::changed)
+ .next_back()
+ .expect("the application will have a vapid key after a refresh");
+
+ // Create a dummy subscription with the original key.
+
+ let request = super::Request {
+ subscription: super::Subscription {
+ endpoint: String::from("https://push.example.com/endpoint"),
+ keys: super::Keys {
+ p256dh: String::from("test-p256dh-value"),
+ auth: String::from("test-auth-value"),
+ },
+ },
+ vapid: vapid.key,
+ };
+ let response = super::handler(State(app.push()), subscriber, Json(request))
+ .await
+ .expect_err("test request has a stale vapid key");
+
+ // Check that the response looks as expected.
+
+ assert!(matches!(
+ response,
+ super::Error(SubscribeError::StaleVapidKey(key)) if key == fresh_vapid.key
+ ));
+}
diff --git a/src/push/mod.rs b/src/push/mod.rs
new file mode 100644
index 0000000..1394ea4
--- /dev/null
+++ b/src/push/mod.rs
@@ -0,0 +1,3 @@
+pub mod app;
+pub mod handlers;
+pub mod repo;
diff --git a/src/push/repo.rs b/src/push/repo.rs
new file mode 100644
index 0000000..6c18c6e
--- /dev/null
+++ b/src/push/repo.rs
@@ -0,0 +1,114 @@
+use sqlx::{Sqlite, SqliteConnection, Transaction};
+use web_push::SubscriptionInfo;
+
+use crate::{login::Login, token::Token};
+
+pub trait Provider {
+ fn push(&mut self) -> Push<'_>;
+}
+
+impl Provider for Transaction<'_, Sqlite> {
+ fn push(&mut self) -> Push<'_> {
+ Push(self)
+ }
+}
+
+pub struct Push<'t>(&'t mut SqliteConnection);
+
+impl Push<'_> {
+ pub async fn create(
+ &mut self,
+ token: &Token,
+ subscription: &SubscriptionInfo,
+ ) -> Result<(), sqlx::Error> {
+ sqlx::query!(
+ r#"
+ insert into push_subscription (token, endpoint, p256dh, auth)
+ values ($1, $2, $3, $4)
+ "#,
+ token.id,
+ subscription.endpoint,
+ subscription.keys.p256dh,
+ subscription.keys.auth,
+ )
+ .execute(&mut *self.0)
+ .await?;
+
+ Ok(())
+ }
+
+ pub async fn by_endpoint(
+ &mut self,
+ subscriber: &Login,
+ endpoint: &str,
+ ) -> Result<SubscriptionInfo, sqlx::Error> {
+ let row = sqlx::query!(
+ r#"
+ select
+ subscription.endpoint,
+ subscription.p256dh,
+ subscription.auth
+ from push_subscription as subscription
+ join token on subscription.token = token.id
+ join login as subscriber on token.login = subscriber.id
+ where subscriber.id = $1
+ and subscription.endpoint = $2
+ "#,
+ subscriber.id,
+ endpoint,
+ )
+ .fetch_one(&mut *self.0)
+ .await?;
+
+ let info = SubscriptionInfo::new(row.endpoint, row.p256dh, row.auth);
+
+ Ok(info)
+ }
+
+ pub async fn unsubscribe_token(&mut self, token: &Token) -> Result<(), sqlx::Error> {
+ sqlx::query!(
+ r#"
+ delete from push_subscription
+ where token = $1
+ "#,
+ token.id,
+ )
+ .execute(&mut *self.0)
+ .await?;
+
+ Ok(())
+ }
+
+ pub async fn unsubscribe_login(&mut self, login: &Login) -> Result<(), sqlx::Error> {
+ sqlx::query!(
+ r#"
+ with tokens as (
+ select id from token
+ where login = $1
+ )
+ delete from push_subscription
+ where token in tokens
+ "#,
+ login.id,
+ )
+ .execute(&mut *self.0)
+ .await?;
+
+ Ok(())
+ }
+
+ // Unsubscribe logic for token expiry lives in the `tokens` repository, for maintenance reasons.
+
+ pub async fn clear(&mut self) -> Result<(), sqlx::Error> {
+ // We assume that _all_ stored subscriptions are for a VAPID key we're about to delete.
+ sqlx::query!(
+ r#"
+ delete from push_subscription
+ "#,
+ )
+ .execute(&mut *self.0)
+ .await?;
+
+ Ok(())
+ }
+}