diff options
| author | Owen Jacobson <owen@grimoire.ca> | 2026-02-27 16:41:37 -0500 |
|---|---|---|
| committer | Owen Jacobson <owen@grimoire.ca> | 2026-02-27 18:15:33 -0500 |
| commit | b32c7682f0a84619a6d1845516a6a1829fa0c59b (patch) | |
| tree | fad8bf5bd8eb628253611f0b3d1a91febe3b9266 /src/push | |
| parent | 6ab0f42250294e38e8da6a48260ff83544a6be9a (diff) | |
I want push publication to be "fire and forget," and ultimately also for it to be asynchronous and retriable. To facilitate that, the caller needs to be insulated from the final outcome of publishing a push message. I've opted to preserve the `Failure` possibility, but any delivery issues are now handled inside the publisher.
Diffstat (limited to 'src/push')
| -rw-r--r-- | src/push/app.rs | 48 | ||||
| -rw-r--r-- | src/push/handlers/ping/test.rs | 200 | ||||
| -rw-r--r-- | src/push/publisher.rs | 66 |
3 files changed, 58 insertions, 256 deletions
diff --git a/src/push/app.rs b/src/push/app.rs index f7846a6..1983055 100644 --- a/src/push/app.rs +++ b/src/push/app.rs @@ -1,6 +1,6 @@ use p256::ecdsa::VerifyingKey; use sqlx::SqlitePool; -use web_push::{SubscriptionInfo, WebPushError}; +use web_push::SubscriptionInfo; use super::repo::Provider as _; use crate::{ @@ -78,7 +78,7 @@ impl<P> Push<P> where P: Publish, { - pub async fn ping(&self, recipient: &Login) -> Result<(), PushError> { + pub async fn ping(&self, recipient: &Login) -> Result<(), Failed> { let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?; let signer = tx @@ -99,44 +99,10 @@ where // we need to write. tx.commit().await.fail(db::failed::COMMIT)?; - let failures = self - .publisher + self.publisher .publish(Heartbeat::Heartbeat, &signer, &subscriptions) .await .fail("Failed to send push message")?; - - if !failures.is_empty() { - let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?; - // Note that data integrity guarantees from the original transaction to read - // subscriptions may no longer be valid now. Time has passed. Depending on how slow - // delivering push notifications is, potentially a _lot_ of time has passed. - - for (sub, err) in &failures { - match err { - // I _think_ this is the complete set of permanent failures. See - // <https://docs.rs/web-push/latest/web_push/enum.WebPushError.html> for a complete - // list. - WebPushError::Unauthorized(_) - | WebPushError::InvalidUri - | WebPushError::EndpointNotValid(_) - | WebPushError::EndpointNotFound(_) - | WebPushError::InvalidCryptoKeys - | WebPushError::MissingCryptoKeys => { - tx.push() - .unsubscribe(sub) - .await - .fail("Failed to unsubscribe after permanent push message rejection")?; - } - _ => (), - } - } - - tx.commit().await.fail(db::failed::COMMIT)?; - - return Err(PushError::Delivery( - failures.into_iter().map(|(_, err)| err).collect(), - )); - } Ok(()) } } @@ -153,11 +119,3 @@ pub enum SubscribeError { #[error(transparent)] Failed(#[from] Failed), } - -#[derive(Debug, thiserror::Error)] -pub enum PushError { - #[error("push message delivery failures: {0:?}")] - Delivery(Vec<WebPushError>), - #[error(transparent)] - Failed(#[from] Failed), -} diff --git a/src/push/handlers/ping/test.rs b/src/push/handlers/ping/test.rs index cc07ef0..70ba4bf 100644 --- a/src/push/handlers/ping/test.rs +++ b/src/push/handlers/ping/test.rs @@ -172,203 +172,3 @@ async fn ping_recipient_only() { .any(|publish| publish.recipients.contains(&spectator_subscription)) ); } - -#[tokio::test] -async fn ping_permanent_error() { - let app = fixtures::scratch_app().await; - - let recipient = fixtures::identity::create(&app, &fixtures::now()).await; - - let vapid = fixtures::vapid::key(&app).await; - - // Create subscriptions - let subscription = SubscriptionInfo::new( - "https://push.example.com/recipient/endpoint", - "recipient-p256dh-key", - "recipient-auth", - ); - app.push() - .subscribe(&recipient, &subscription, &vapid) - .await - .expect("creating a subscription succeeds"); - - // Prepare the next ping attempt to fail - app.publisher() - .fail_next(&subscription, WebPushError::InvalidUri); - - // Send a ping - super::handler( - State(app.push()), - recipient.clone(), - Json(super::Request {}), - ) - .await - .expect_err("sending a ping with a permanently-failing subscription fails"); - - // Confirm that it was actually sent - let sent = app.publisher().sent(); - - let subscriptions = HashSet::from([subscription.clone()]); - assert!( - sent.iter() - .filter(|publish| publish.message_eq(&Heartbeat::Heartbeat) - && publish.recipients == subscriptions) - .exactly_one() - .is_ok() - ); - - // Send a second ping - let response = super::handler(State(app.push()), recipient, Json(super::Request {})) - .await - .expect("sending a ping with no subscriptions always succeeds"); - - assert_eq!(StatusCode::ACCEPTED, response); - - // Confirm that it was _not_ sent this time, since it failed permanently last time - let sent = app.publisher().sent(); - - assert!( - !sent - .iter() - .any(|publish| publish.recipients.contains(&subscription)) - ); -} - -#[tokio::test] -async fn ping_temporary_error() { - let app = fixtures::scratch_app().await; - - let recipient = fixtures::identity::create(&app, &fixtures::now()).await; - - let vapid = fixtures::vapid::key(&app).await; - - // Create subscriptions - let subscription = SubscriptionInfo::new( - "https://push.example.com/recipient/endpoint", - "recipient-p256dh-key", - "recipient-auth", - ); - app.push() - .subscribe(&recipient, &subscription, &vapid) - .await - .expect("creating a subscription succeeds"); - - // Prepare the next ping attempt to fail - app.publisher().fail_next( - &subscription, - WebPushError::from(io::Error::other("transient IO error")), - ); - - // Send a ping - super::handler( - State(app.push()), - recipient.clone(), - Json(super::Request {}), - ) - .await - .expect_err("sending a ping with a temporarily-failing subscription fails"); - - // Confirm that it was actually sent - let sent = app.publisher().sent(); - - let subscriptions = HashSet::from([subscription.clone()]); - assert!( - sent.iter() - .filter(|publish| publish.message_eq(&Heartbeat::Heartbeat) - && publish.recipients == subscriptions) - .exactly_one() - .is_ok() - ); - - // Send a second ping - let response = super::handler(State(app.push()), recipient, Json(super::Request {})) - .await - .expect("sending a ping subscription succeeds now that the transient error has cleared"); - assert_eq!(StatusCode::ACCEPTED, response); - - // Confirm that it was _not_ sent this time, since it failed permanently last time - let sent = app.publisher().sent(); - - let subscriptions = HashSet::from([subscription]); - assert!( - sent.iter() - .filter(|publish| publish.message_eq(&Heartbeat::Heartbeat) - && publish.recipients == subscriptions) - .exactly_one() - .is_ok() - ); -} - -#[tokio::test] -async fn ping_multiple_subscriptions_with_failure() { - let app = fixtures::scratch_app().await; - - let recipient = fixtures::identity::create(&app, &fixtures::now()).await; - let vapid = fixtures::vapid::key(&app).await; - - // Create subscriptions - let failing = SubscriptionInfo::new( - "https://push.example.com/endpoint-failing", - "testing-p256dh-key-failing", - "testing-auth-failing", - ); - let succeeding = SubscriptionInfo::new( - "https://push.example.com/endpoint-succeeding", - "testing-p256dh-key-succeeding", - "testing-auth-succeeding", - ); - let subscriptions = HashSet::from([failing.clone(), succeeding.clone()]); - for subscription in &subscriptions { - app.push() - .subscribe(&recipient, subscription, &vapid) - .await - .expect("creating a subscription succeeds"); - } - - // Rig one of them to fail permanently - app.publisher() - .fail_next(&failing, WebPushError::InvalidUri); - - // Send a ping - super::handler( - State(app.push()), - recipient.clone(), - Json(super::Request {}), - ) - .await - .expect_err("sending a ping with a failing subscription fails"); - - // Confirm that it was actually sent to both subs the first time - let subscriptions = HashSet::from([failing.clone(), succeeding.clone()]); - assert!( - app.publisher() - .sent() - .iter() - .filter(|publish| publish.message_eq(&Heartbeat::Heartbeat) - && publish.recipients == subscriptions) - .exactly_one() - .is_ok() - ); - - // Send a second ping - let response = super::handler(State(app.push()), recipient, Json(super::Request {})) - .await - .expect("sending a second ping succeeds after the failing subscription is removed"); - assert_eq!(StatusCode::ACCEPTED, response); - - // Confirm that it was only sent to the succeeding subscription - let subscriptions = HashSet::from([succeeding]); - let sent = app.publisher().sent(); - assert!( - sent.iter() - .filter(|publish| publish.message_eq(&Heartbeat::Heartbeat) - && publish.recipients == subscriptions) - .exactly_one() - .is_ok() - ); - assert!( - !sent - .iter() - .any(|publish| publish.recipients.contains(&failing)) - ); -} diff --git a/src/push/publisher.rs b/src/push/publisher.rs index d6227a2..ef23f2f 100644 --- a/src/push/publisher.rs +++ b/src/push/publisher.rs @@ -1,34 +1,40 @@ use futures::future::join_all; use itertools::Itertools as _; use serde::Serialize; +use sqlx::SqlitePool; use web_push::{ ContentEncoding, IsahcWebPushClient, PartialVapidSignatureBuilder, SubscriptionInfo, WebPushClient, WebPushError, WebPushMessage, WebPushMessageBuilder, }; -use crate::error::failed::{Failed, ResultExt as _}; +use crate::{ + db, + error::failed::{Failed, ResultExt as _}, + push::repo::Provider, +}; #[async_trait::async_trait] pub trait Publish { - async fn publish<'s, M>( + async fn publish<M>( &self, message: M, signer: &PartialVapidSignatureBuilder, - subscriptions: impl IntoIterator<Item = &'s SubscriptionInfo> + Send, - ) -> Result<Vec<(&'s SubscriptionInfo, WebPushError)>, Failed> + subscriptions: impl IntoIterator<Item = &'_ SubscriptionInfo> + Send, + ) -> Result<(), Failed> where M: Serialize + Send + 'static; } #[derive(Clone)] pub struct Publisher { + db: SqlitePool, client: IsahcWebPushClient, } impl Publisher { - pub fn new() -> Result<Self, WebPushError> { + pub fn new(db: SqlitePool) -> Result<Self, WebPushError> { let client = IsahcWebPushClient::new()?; - Ok(Self { client }) + Ok(Self { db, client }) } fn prepare_message( @@ -49,16 +55,52 @@ impl Publisher { Ok(message) } + + async fn settle_failed( + &self, + failures: Vec<(&SubscriptionInfo, WebPushError)>, + ) -> Result<(), Failed> { + if !failures.is_empty() { + let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?; + // Note that data integrity guarantees from whatever transaction originally read the + // subscriptions may no longer be valid now. Time has passed. Depending on how slow + // delivering push notifications is, potentially a _lot_ of time has passed. + + for (sub, err) in &failures { + match err { + // I _think_ this is the complete set of permanent failures. See + // <https://docs.rs/web-push/latest/web_push/enum.WebPushError.html> for a complete + // list. + WebPushError::Unauthorized(_) + | WebPushError::InvalidUri + | WebPushError::EndpointNotValid(_) + | WebPushError::EndpointNotFound(_) + | WebPushError::InvalidCryptoKeys + | WebPushError::MissingCryptoKeys => { + tx.push() + .unsubscribe(sub) + .await + .fail("Failed to unsubscribe after permanent push message rejection")?; + } + _ => (), + } + } + + tx.commit().await.fail(db::failed::COMMIT)?; + } + + Ok(()) + } } #[async_trait::async_trait] impl Publish for Publisher { - async fn publish<'s, M>( + async fn publish<M>( &self, message: M, signer: &PartialVapidSignatureBuilder, - subscriptions: impl IntoIterator<Item = &'s SubscriptionInfo> + Send, - ) -> Result<Vec<(&'s SubscriptionInfo, WebPushError)>, Failed> + subscriptions: impl IntoIterator<Item = &'_ SubscriptionInfo> + Send, + ) -> Result<(), Failed> where M: Serialize + Send + 'static, { @@ -74,12 +116,14 @@ impl Publish for Publisher { .into_iter() .map(async |(sub, message)| (sub, self.client.send(message).await)); - let failures = join_all(deliveries) + let failures: Vec<_> = join_all(deliveries) .await .into_iter() .filter_map(|(sub, result)| result.err().map(|err| (sub, err))) .collect(); - Ok(failures) + self.settle_failed(failures).await?; + + Ok(()) } } |
