diff options
| author | Owen Jacobson <owen@grimoire.ca> | 2025-12-09 15:13:21 -0500 |
|---|---|---|
| committer | Owen Jacobson <owen@grimoire.ca> | 2025-12-17 15:48:20 -0500 |
| commit | 3c697f5fb1b8dbad46eac8fa299ed7cebfb36159 (patch) | |
| tree | b7854fb23d1e104f928acfe3bba75ea3b74b83d9 /src/push | |
| parent | 41a5a0f7e13bf5a82aaef59e34eb68f0fe7fa7f5 (diff) | |
Factor push message publication out to its own helper component.
The `Publisher` component handles the details of web push delivery. Callers must provide the subscription set, the current signer, and the message, while the publisher handles encoding and communication with web push endpoints.
To facilitate testing, `Publisher` implements `Publish`, which is a new trait with the same interface. Components that might publish web push messages should rely on the trait where possible. The test suite now constructs an app with a dummy `Publish` impl, which captures push messages for examination.
Note that the testing implementation of `Publish` is hand-crafted, and presently only acts to record the arguments it receives. The other alternative was to use a mocking library, such as `mockit`, and while I've used that approach before, I'm not super comfortable with the complexity in this situation. I think we can maintain a more reasonable testing `Publish` impl by hand, at least for now, and we can revisit that decision later if need be.
Tests for the `ping` endpoint have been migrated to this endpoint.
Diffstat (limited to 'src/push')
| -rw-r--r-- | src/push/app.rs | 68 | ||||
| -rw-r--r-- | src/push/handlers/ping/mod.rs | 9 | ||||
| -rw-r--r-- | src/push/handlers/ping/test.rs | 18 | ||||
| -rw-r--r-- | src/push/handlers/subscribe/test.rs | 95 | ||||
| -rw-r--r-- | src/push/mod.rs | 3 | ||||
| -rw-r--r-- | src/push/publisher.rs | 83 |
6 files changed, 138 insertions, 138 deletions
diff --git a/src/push/app.rs b/src/push/app.rs index 2bd6c25..ebfc220 100644 --- a/src/push/app.rs +++ b/src/push/app.rs @@ -1,29 +1,26 @@ -use futures::future::join_all; -use itertools::Itertools as _; use p256::ecdsa::VerifyingKey; use sqlx::SqlitePool; -use web_push::{ - ContentEncoding, PartialVapidSignatureBuilder, SubscriptionInfo, WebPushClient, WebPushError, - WebPushMessage, WebPushMessageBuilder, -}; +use web_push::{SubscriptionInfo, WebPushError}; use super::repo::Provider as _; use crate::{ db, error::failed::{ErrorExt as _, Failed, ResultExt as _}, + event::Heartbeat, login::Login, + push::publisher::Publish, token::extract::Identity, vapid::repo::Provider as _, }; pub struct Push<P> { db: SqlitePool, - webpush: P, + publisher: P, } impl<P> Push<P> { - pub const fn new(db: SqlitePool, webpush: P) -> Self { - Self { db, webpush } + pub const fn new(db: SqlitePool, publisher: P) -> Self { + Self { db, publisher } } pub async fn subscribe( @@ -79,28 +76,8 @@ impl<P> Push<P> { impl<P> Push<P> where - P: WebPushClient, + P: Publish, { - fn prepare_ping( - signer: &PartialVapidSignatureBuilder, - subscription: &SubscriptionInfo, - ) -> Result<WebPushMessage, PushError> { - let signature = signer - .clone() - .add_sub_info(subscription) - .build() - .fail("Failed to build VAPID signature")?; - - let payload = "ping".as_bytes(); - - let mut message = WebPushMessageBuilder::new(subscription); - message.set_payload(ContentEncoding::Aes128Gcm, payload); - message.set_vapid_signature(signature); - let message = message.build().fail("Failed to build push message")?; - - Ok(message) - } - pub async fn ping(&self, recipient: &Login) -> Result<(), PushError> { let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?; @@ -116,22 +93,24 @@ where ) })?; - let pings: Vec<_> = subscriptions - .into_iter() - .map(|sub| Self::prepare_ping(&signer, &sub).map(|message| (sub, message))) - .try_collect()?; - - let deliveries = pings - .into_iter() - .map(async |(sub, message)| (sub, self.webpush.send(message).await)); + // We're about to perform some fairly IO-intensive outside actions. Holding a tx open + // across them raises the risk that other clients will encounter errors due to locks from + // this transaction, so release it here. We'll open a new transaction if there's something + // we need to write. + tx.commit().await.fail(db::failed::COMMIT)?; - let failures: Vec<_> = join_all(deliveries) + let failures = self + .publisher + .publish(Heartbeat::Heartbeat, signer, subscriptions) .await - .into_iter() - .filter_map(|(sub, result)| result.err().map(|err| (sub, err))) - .collect(); + .fail("Failed to send push message")?; if !failures.is_empty() { + let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?; + // Note that data integrity guarantees from the original transaction to read + // subscriptions may no longer be valid now. Time has passed. Depending on how slow + // delivering push notifications is, potentially a _lot_ of time has passed. + for (sub, err) in &failures { match err { // I _think_ this is the complete set of permanent failures. See @@ -152,13 +131,12 @@ where } } + tx.commit().await.fail(db::failed::COMMIT)?; + return Err(PushError::Delivery( failures.into_iter().map(|(_, err)| err).collect(), )); } - - tx.commit().await.fail(db::failed::COMMIT)?; - Ok(()) } } diff --git a/src/push/handlers/ping/mod.rs b/src/push/handlers/ping/mod.rs index db828fa..2a86984 100644 --- a/src/push/handlers/ping/mod.rs +++ b/src/push/handlers/ping/mod.rs @@ -1,7 +1,10 @@ use axum::{Json, extract::State, http::StatusCode}; -use web_push::WebPushClient; -use crate::{error::Internal, push::app::Push, token::extract::Identity}; +use crate::{ + error::Internal, + push::{Publish, app::Push}, + token::extract::Identity, +}; #[cfg(test)] mod test; @@ -15,7 +18,7 @@ pub async fn handler<P>( Json(_): Json<Request>, ) -> Result<StatusCode, Internal> where - P: WebPushClient, + P: Publish, { push.ping(&identity.login).await?; diff --git a/src/push/handlers/ping/test.rs b/src/push/handlers/ping/test.rs index 5725131..3481139 100644 --- a/src/push/handlers/ping/test.rs +++ b/src/push/handlers/ping/test.rs @@ -2,8 +2,9 @@ use axum::{ extract::{Json, State}, http::StatusCode, }; +use itertools::Itertools; -use crate::test::fixtures; +use crate::{event::Heartbeat, test::fixtures}; #[tokio::test] async fn ping_without_subscriptions() { @@ -11,18 +12,21 @@ async fn ping_without_subscriptions() { let recipient = fixtures::identity::create(&app, &fixtures::now()).await; - app.vapid() - .refresh_key(&fixtures::now()) - .await - .expect("refreshing the VAPID key always succeeds"); - let response = super::handler(State(app.push()), recipient, Json(super::Request {})) .await .expect("sending a ping with no subscriptions always succeeds"); assert_eq!(StatusCode::ACCEPTED, response); - assert!(app.webpush().sent().is_empty()); + assert!( + app.publisher() + .sent() + .into_iter() + .filter(|publish| publish.message_eq(&Heartbeat::Heartbeat) + && publish.subscriptions.is_empty()) + .exactly_one() + .is_ok() + ); } // More complete testing requires that we figure out how to generate working p256 ECDH keys for diff --git a/src/push/handlers/subscribe/test.rs b/src/push/handlers/subscribe/test.rs index 1bc37a4..793bcef 100644 --- a/src/push/handlers/subscribe/test.rs +++ b/src/push/handlers/subscribe/test.rs @@ -3,33 +3,16 @@ use axum::{ http::StatusCode, }; -use crate::{ - push::app::SubscribeError, - test::{fixtures, fixtures::event}, -}; +use crate::{push::app::SubscribeError, test::fixtures}; #[tokio::test] async fn accepts_new_subscription() { let app = fixtures::scratch_app().await; let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - // Issue a VAPID key. - - app.vapid() - .refresh_key(&fixtures::now()) - .await - .expect("refreshing the VAPID key always succeeds"); - // Find out what that VAPID key is. - let boot = app.boot().snapshot().await.expect("boot always succeeds"); - let vapid = boot - .events - .into_iter() - .filter_map(event::vapid) - .filter_map(event::vapid::changed) - .next_back() - .expect("the application will have a vapid key after a refresh"); + let vapid = fixtures::vapid::key(&app).await; // Create a dummy subscription with that key. @@ -41,7 +24,7 @@ async fn accepts_new_subscription() { auth: String::from("test-auth-value"), }, }, - vapid: vapid.key, + vapid, }; let response = super::handler(State(app.push()), subscriber, Json(request)) .await @@ -57,23 +40,9 @@ async fn accepts_repeat_subscription() { let app = fixtures::scratch_app().await; let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - // Issue a VAPID key. - - app.vapid() - .refresh_key(&fixtures::now()) - .await - .expect("refreshing the VAPID key always succeeds"); - // Find out what that VAPID key is. - let boot = app.boot().snapshot().await.expect("boot always succeeds"); - let vapid = boot - .events - .into_iter() - .filter_map(event::vapid) - .filter_map(event::vapid::changed) - .next_back() - .expect("the application will have a vapid key after a refresh"); + let vapid = fixtures::vapid::key(&app).await; // Create a dummy subscription with that key. @@ -85,7 +54,7 @@ async fn accepts_repeat_subscription() { auth: String::from("test-auth-value"), }, }, - vapid: vapid.key, + vapid, }; let response = super::handler(State(app.push()), subscriber.clone(), Json(request.clone())) .await @@ -111,23 +80,9 @@ async fn rejects_duplicate_subscription() { let app = fixtures::scratch_app().await; let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - // Issue a VAPID key. - - app.vapid() - .refresh_key(&fixtures::now()) - .await - .expect("refreshing the VAPID key always succeeds"); - // Find out what that VAPID key is. - let boot = app.boot().snapshot().await.expect("boot always succeeds"); - let vapid = boot - .events - .into_iter() - .filter_map(event::vapid) - .filter_map(event::vapid::changed) - .next_back() - .expect("the application will have a vapid key after a refresh"); + let vapid = fixtures::vapid::key(&app).await; // Create a dummy subscription with that key. @@ -139,7 +94,7 @@ async fn rejects_duplicate_subscription() { auth: String::from("test-auth-value"), }, }, - vapid: vapid.key, + vapid, }; super::handler(State(app.push()), subscriber.clone(), Json(request)) .await @@ -155,7 +110,7 @@ async fn rejects_duplicate_subscription() { auth: String::from("different-test-auth-value"), }, }, - vapid: vapid.key, + vapid, }; let response = super::handler(State(app.push()), subscriber, Json(request)) .await @@ -170,24 +125,7 @@ async fn rejects_duplicate_subscription() { async fn rejects_stale_vapid_key() { let app = fixtures::scratch_app().await; let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - - // Issue a VAPID key. - - app.vapid() - .refresh_key(&fixtures::now()) - .await - .expect("refreshing the VAPID key always succeeds"); - - // Find out what that VAPID key is. - - let boot = app.boot().snapshot().await.expect("boot always succeeds"); - let vapid = boot - .events - .into_iter() - .filter_map(event::vapid) - .filter_map(event::vapid::changed) - .next_back() - .expect("the application will have a vapid key after a refresh"); + let stale_vapid = fixtures::vapid::key(&app).await; // Change the VAPID key. @@ -200,16 +138,7 @@ async fn rejects_stale_vapid_key() { .await .expect("refreshing the VAPID key always succeeds"); - // Find out what the new VAPID key is. - - let boot = app.boot().snapshot().await.expect("boot always succeeds"); - let fresh_vapid = boot - .events - .into_iter() - .filter_map(event::vapid) - .filter_map(event::vapid::changed) - .next_back() - .expect("the application will have a vapid key after a refresh"); + let fresh_vapid = fixtures::vapid::key(&app).await; // Create a dummy subscription with the original key. @@ -221,7 +150,7 @@ async fn rejects_stale_vapid_key() { auth: String::from("test-auth-value"), }, }, - vapid: vapid.key, + vapid: stale_vapid, }; let response = super::handler(State(app.push()), subscriber, Json(request)) .await @@ -231,6 +160,6 @@ async fn rejects_stale_vapid_key() { assert!(matches!( response, - super::Error(SubscribeError::StaleVapidKey(key)) if key == fresh_vapid.key + super::Error(SubscribeError::StaleVapidKey(key)) if key == fresh_vapid )); } diff --git a/src/push/mod.rs b/src/push/mod.rs index 1394ea4..042991f 100644 --- a/src/push/mod.rs +++ b/src/push/mod.rs @@ -1,3 +1,6 @@ pub mod app; pub mod handlers; +mod publisher; pub mod repo; + +pub use publisher::{Publish, Publisher}; diff --git a/src/push/publisher.rs b/src/push/publisher.rs new file mode 100644 index 0000000..4092724 --- /dev/null +++ b/src/push/publisher.rs @@ -0,0 +1,83 @@ +use futures::future::join_all; +use itertools::Itertools as _; +use serde::Serialize; +use web_push::{ + ContentEncoding, IsahcWebPushClient, PartialVapidSignatureBuilder, SubscriptionInfo, + WebPushClient, WebPushError, WebPushMessage, WebPushMessageBuilder, +}; + +use crate::error::failed::{Failed, ResultExt as _}; + +pub trait Publish { + fn publish<M>( + &self, + message: M, + signer: PartialVapidSignatureBuilder, + subscriptions: impl IntoIterator<Item = SubscriptionInfo> + Send, + ) -> impl Future<Output = Result<Vec<(SubscriptionInfo, WebPushError)>, Failed>> + Send + where + M: Serialize + Send + 'static; +} + +#[derive(Clone)] +pub struct Publisher { + client: IsahcWebPushClient, +} + +impl Publisher { + pub fn new() -> Result<Self, WebPushError> { + let client = IsahcWebPushClient::new()?; + Ok(Self { client }) + } + + fn prepare_message( + payload: &[u8], + signer: &PartialVapidSignatureBuilder, + subscription: &SubscriptionInfo, + ) -> Result<WebPushMessage, Failed> { + let signature = signer + .clone() + .add_sub_info(subscription) + .build() + .fail("Failed to build VAPID signature")?; + + let mut message = WebPushMessageBuilder::new(subscription); + message.set_payload(ContentEncoding::Aes128Gcm, payload); + message.set_vapid_signature(signature); + let message = message.build().fail("Failed to build push message")?; + + Ok(message) + } +} + +impl Publish for Publisher { + async fn publish<M>( + &self, + message: M, + signer: PartialVapidSignatureBuilder, + subscriptions: impl IntoIterator<Item = SubscriptionInfo> + Send, + ) -> Result<Vec<(SubscriptionInfo, WebPushError)>, Failed> + where + M: Serialize + Send + 'static, + { + let payload = serde_json::to_vec_pretty(&message) + .fail("Failed to encode web push message to JSON")?; + + let messages: Vec<_> = subscriptions + .into_iter() + .map(|sub| Self::prepare_message(&payload, &signer, &sub).map(|message| (sub, message))) + .try_collect()?; + + let deliveries = messages + .into_iter() + .map(async |(sub, message)| (sub, self.client.send(message).await)); + + let failures = join_all(deliveries) + .await + .into_iter() + .filter_map(|(sub, result)| result.err().map(|err| (sub, err))) + .collect(); + + Ok(failures) + } +} |
