From b32c7682f0a84619a6d1845516a6a1829fa0c59b Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Fri, 27 Feb 2026 16:41:37 -0500 Subject: Move failed push handling inside of the web push publisher. I want push publication to be "fire and forget," and ultimately also for it to be asynchronous and retriable. To facilitate that, the caller needs to be insulated from the final outcome of publishing a push message. I've opted to preserve the `Failure` possibility, but any delivery issues are now handled inside the publisher. --- src/cli.rs | 2 +- src/message/app.rs | 32 +------ src/push/app.rs | 48 +--------- src/push/handlers/ping/test.rs | 200 ----------------------------------------- src/push/publisher.rs | 66 +++++++++++--- src/test/webpush.rs | 29 ++---- ui/service-worker.js | 1 + 7 files changed, 69 insertions(+), 309 deletions(-) diff --git a/src/cli.rs b/src/cli.rs index 971d1f9..e0d8b75 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -101,7 +101,7 @@ impl Args { self.umask.set(); let pool = self.pool().await.fail("Failed to create database pool")?; - let publisher = Publisher::new().fail("Failed to create web push publisher")?; + let publisher = Publisher::new(pool.clone()).fail("Failed to create web push publisher")?; let app = App::from(pool, publisher); match self.command { diff --git a/src/message/app.rs b/src/message/app.rs index b79dad0..4bf96bf 100644 --- a/src/message/app.rs +++ b/src/message/app.rs @@ -1,7 +1,6 @@ use chrono::TimeDelta; use itertools::Itertools; use sqlx::sqlite::SqlitePool; -use web_push::WebPushError; use super::{Body, History, Id, Message, history, repo::Provider as _}; use crate::{ @@ -184,39 +183,10 @@ where self.events.broadcast_from(events.clone()); for event in events { - let failures = self - .publisher + self.publisher .publish(Event::from(event), &signer, &push_recipients) .await .fail("Failed to publish push events")?; - - if !failures.is_empty() { - let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?; - // Note that data integrity guarantees from the original transaction to read - // subscriptions may no longer be valid now. Time has passed. Depending on how slow - // delivering push notifications is, potentially a _lot_ of time has passed. - - for (sub, err) in failures { - match err { - // I _think_ this is the complete set of permanent failures. See - // for a complete - // list. - WebPushError::Unauthorized(_) - | WebPushError::InvalidUri - | WebPushError::EndpointNotValid(_) - | WebPushError::EndpointNotFound(_) - | WebPushError::InvalidCryptoKeys - | WebPushError::MissingCryptoKeys => { - tx.push().unsubscribe(sub).await.fail( - "Failed to unsubscribe after permanent push message rejection", - )?; - } - _ => (), - } - } - - tx.commit().await.fail(db::failed::COMMIT)?; - } } Ok(message.as_sent()) diff --git a/src/push/app.rs b/src/push/app.rs index f7846a6..1983055 100644 --- a/src/push/app.rs +++ b/src/push/app.rs @@ -1,6 +1,6 @@ use p256::ecdsa::VerifyingKey; use sqlx::SqlitePool; -use web_push::{SubscriptionInfo, WebPushError}; +use web_push::SubscriptionInfo; use super::repo::Provider as _; use crate::{ @@ -78,7 +78,7 @@ impl

Push

where P: Publish, { - pub async fn ping(&self, recipient: &Login) -> Result<(), PushError> { + pub async fn ping(&self, recipient: &Login) -> Result<(), Failed> { let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?; let signer = tx @@ -99,44 +99,10 @@ where // we need to write. tx.commit().await.fail(db::failed::COMMIT)?; - let failures = self - .publisher + self.publisher .publish(Heartbeat::Heartbeat, &signer, &subscriptions) .await .fail("Failed to send push message")?; - - if !failures.is_empty() { - let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?; - // Note that data integrity guarantees from the original transaction to read - // subscriptions may no longer be valid now. Time has passed. Depending on how slow - // delivering push notifications is, potentially a _lot_ of time has passed. - - for (sub, err) in &failures { - match err { - // I _think_ this is the complete set of permanent failures. See - // for a complete - // list. - WebPushError::Unauthorized(_) - | WebPushError::InvalidUri - | WebPushError::EndpointNotValid(_) - | WebPushError::EndpointNotFound(_) - | WebPushError::InvalidCryptoKeys - | WebPushError::MissingCryptoKeys => { - tx.push() - .unsubscribe(sub) - .await - .fail("Failed to unsubscribe after permanent push message rejection")?; - } - _ => (), - } - } - - tx.commit().await.fail(db::failed::COMMIT)?; - - return Err(PushError::Delivery( - failures.into_iter().map(|(_, err)| err).collect(), - )); - } Ok(()) } } @@ -153,11 +119,3 @@ pub enum SubscribeError { #[error(transparent)] Failed(#[from] Failed), } - -#[derive(Debug, thiserror::Error)] -pub enum PushError { - #[error("push message delivery failures: {0:?}")] - Delivery(Vec), - #[error(transparent)] - Failed(#[from] Failed), -} diff --git a/src/push/handlers/ping/test.rs b/src/push/handlers/ping/test.rs index cc07ef0..70ba4bf 100644 --- a/src/push/handlers/ping/test.rs +++ b/src/push/handlers/ping/test.rs @@ -172,203 +172,3 @@ async fn ping_recipient_only() { .any(|publish| publish.recipients.contains(&spectator_subscription)) ); } - -#[tokio::test] -async fn ping_permanent_error() { - let app = fixtures::scratch_app().await; - - let recipient = fixtures::identity::create(&app, &fixtures::now()).await; - - let vapid = fixtures::vapid::key(&app).await; - - // Create subscriptions - let subscription = SubscriptionInfo::new( - "https://push.example.com/recipient/endpoint", - "recipient-p256dh-key", - "recipient-auth", - ); - app.push() - .subscribe(&recipient, &subscription, &vapid) - .await - .expect("creating a subscription succeeds"); - - // Prepare the next ping attempt to fail - app.publisher() - .fail_next(&subscription, WebPushError::InvalidUri); - - // Send a ping - super::handler( - State(app.push()), - recipient.clone(), - Json(super::Request {}), - ) - .await - .expect_err("sending a ping with a permanently-failing subscription fails"); - - // Confirm that it was actually sent - let sent = app.publisher().sent(); - - let subscriptions = HashSet::from([subscription.clone()]); - assert!( - sent.iter() - .filter(|publish| publish.message_eq(&Heartbeat::Heartbeat) - && publish.recipients == subscriptions) - .exactly_one() - .is_ok() - ); - - // Send a second ping - let response = super::handler(State(app.push()), recipient, Json(super::Request {})) - .await - .expect("sending a ping with no subscriptions always succeeds"); - - assert_eq!(StatusCode::ACCEPTED, response); - - // Confirm that it was _not_ sent this time, since it failed permanently last time - let sent = app.publisher().sent(); - - assert!( - !sent - .iter() - .any(|publish| publish.recipients.contains(&subscription)) - ); -} - -#[tokio::test] -async fn ping_temporary_error() { - let app = fixtures::scratch_app().await; - - let recipient = fixtures::identity::create(&app, &fixtures::now()).await; - - let vapid = fixtures::vapid::key(&app).await; - - // Create subscriptions - let subscription = SubscriptionInfo::new( - "https://push.example.com/recipient/endpoint", - "recipient-p256dh-key", - "recipient-auth", - ); - app.push() - .subscribe(&recipient, &subscription, &vapid) - .await - .expect("creating a subscription succeeds"); - - // Prepare the next ping attempt to fail - app.publisher().fail_next( - &subscription, - WebPushError::from(io::Error::other("transient IO error")), - ); - - // Send a ping - super::handler( - State(app.push()), - recipient.clone(), - Json(super::Request {}), - ) - .await - .expect_err("sending a ping with a temporarily-failing subscription fails"); - - // Confirm that it was actually sent - let sent = app.publisher().sent(); - - let subscriptions = HashSet::from([subscription.clone()]); - assert!( - sent.iter() - .filter(|publish| publish.message_eq(&Heartbeat::Heartbeat) - && publish.recipients == subscriptions) - .exactly_one() - .is_ok() - ); - - // Send a second ping - let response = super::handler(State(app.push()), recipient, Json(super::Request {})) - .await - .expect("sending a ping subscription succeeds now that the transient error has cleared"); - assert_eq!(StatusCode::ACCEPTED, response); - - // Confirm that it was _not_ sent this time, since it failed permanently last time - let sent = app.publisher().sent(); - - let subscriptions = HashSet::from([subscription]); - assert!( - sent.iter() - .filter(|publish| publish.message_eq(&Heartbeat::Heartbeat) - && publish.recipients == subscriptions) - .exactly_one() - .is_ok() - ); -} - -#[tokio::test] -async fn ping_multiple_subscriptions_with_failure() { - let app = fixtures::scratch_app().await; - - let recipient = fixtures::identity::create(&app, &fixtures::now()).await; - let vapid = fixtures::vapid::key(&app).await; - - // Create subscriptions - let failing = SubscriptionInfo::new( - "https://push.example.com/endpoint-failing", - "testing-p256dh-key-failing", - "testing-auth-failing", - ); - let succeeding = SubscriptionInfo::new( - "https://push.example.com/endpoint-succeeding", - "testing-p256dh-key-succeeding", - "testing-auth-succeeding", - ); - let subscriptions = HashSet::from([failing.clone(), succeeding.clone()]); - for subscription in &subscriptions { - app.push() - .subscribe(&recipient, subscription, &vapid) - .await - .expect("creating a subscription succeeds"); - } - - // Rig one of them to fail permanently - app.publisher() - .fail_next(&failing, WebPushError::InvalidUri); - - // Send a ping - super::handler( - State(app.push()), - recipient.clone(), - Json(super::Request {}), - ) - .await - .expect_err("sending a ping with a failing subscription fails"); - - // Confirm that it was actually sent to both subs the first time - let subscriptions = HashSet::from([failing.clone(), succeeding.clone()]); - assert!( - app.publisher() - .sent() - .iter() - .filter(|publish| publish.message_eq(&Heartbeat::Heartbeat) - && publish.recipients == subscriptions) - .exactly_one() - .is_ok() - ); - - // Send a second ping - let response = super::handler(State(app.push()), recipient, Json(super::Request {})) - .await - .expect("sending a second ping succeeds after the failing subscription is removed"); - assert_eq!(StatusCode::ACCEPTED, response); - - // Confirm that it was only sent to the succeeding subscription - let subscriptions = HashSet::from([succeeding]); - let sent = app.publisher().sent(); - assert!( - sent.iter() - .filter(|publish| publish.message_eq(&Heartbeat::Heartbeat) - && publish.recipients == subscriptions) - .exactly_one() - .is_ok() - ); - assert!( - !sent - .iter() - .any(|publish| publish.recipients.contains(&failing)) - ); -} diff --git a/src/push/publisher.rs b/src/push/publisher.rs index d6227a2..ef23f2f 100644 --- a/src/push/publisher.rs +++ b/src/push/publisher.rs @@ -1,34 +1,40 @@ use futures::future::join_all; use itertools::Itertools as _; use serde::Serialize; +use sqlx::SqlitePool; use web_push::{ ContentEncoding, IsahcWebPushClient, PartialVapidSignatureBuilder, SubscriptionInfo, WebPushClient, WebPushError, WebPushMessage, WebPushMessageBuilder, }; -use crate::error::failed::{Failed, ResultExt as _}; +use crate::{ + db, + error::failed::{Failed, ResultExt as _}, + push::repo::Provider, +}; #[async_trait::async_trait] pub trait Publish { - async fn publish<'s, M>( + async fn publish( &self, message: M, signer: &PartialVapidSignatureBuilder, - subscriptions: impl IntoIterator + Send, - ) -> Result, Failed> + subscriptions: impl IntoIterator + Send, + ) -> Result<(), Failed> where M: Serialize + Send + 'static; } #[derive(Clone)] pub struct Publisher { + db: SqlitePool, client: IsahcWebPushClient, } impl Publisher { - pub fn new() -> Result { + pub fn new(db: SqlitePool) -> Result { let client = IsahcWebPushClient::new()?; - Ok(Self { client }) + Ok(Self { db, client }) } fn prepare_message( @@ -49,16 +55,52 @@ impl Publisher { Ok(message) } + + async fn settle_failed( + &self, + failures: Vec<(&SubscriptionInfo, WebPushError)>, + ) -> Result<(), Failed> { + if !failures.is_empty() { + let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?; + // Note that data integrity guarantees from whatever transaction originally read the + // subscriptions may no longer be valid now. Time has passed. Depending on how slow + // delivering push notifications is, potentially a _lot_ of time has passed. + + for (sub, err) in &failures { + match err { + // I _think_ this is the complete set of permanent failures. See + // for a complete + // list. + WebPushError::Unauthorized(_) + | WebPushError::InvalidUri + | WebPushError::EndpointNotValid(_) + | WebPushError::EndpointNotFound(_) + | WebPushError::InvalidCryptoKeys + | WebPushError::MissingCryptoKeys => { + tx.push() + .unsubscribe(sub) + .await + .fail("Failed to unsubscribe after permanent push message rejection")?; + } + _ => (), + } + } + + tx.commit().await.fail(db::failed::COMMIT)?; + } + + Ok(()) + } } #[async_trait::async_trait] impl Publish for Publisher { - async fn publish<'s, M>( + async fn publish( &self, message: M, signer: &PartialVapidSignatureBuilder, - subscriptions: impl IntoIterator + Send, - ) -> Result, Failed> + subscriptions: impl IntoIterator + Send, + ) -> Result<(), Failed> where M: Serialize + Send + 'static, { @@ -74,12 +116,14 @@ impl Publish for Publisher { .into_iter() .map(async |(sub, message)| (sub, self.client.send(message).await)); - let failures = join_all(deliveries) + let failures: Vec<_> = join_all(deliveries) .await .into_iter() .filter_map(|(sub, result)| result.err().map(|err| (sub, err))) .collect(); - Ok(failures) + self.settle_failed(failures).await?; + + Ok(()) } } diff --git a/src/test/webpush.rs b/src/test/webpush.rs index 55caf19..656f66a 100644 --- a/src/test/webpush.rs +++ b/src/test/webpush.rs @@ -1,11 +1,11 @@ use std::{ any::Any, - collections::{HashMap, HashSet}, + collections::HashSet, mem, sync::{Arc, Mutex, MutexGuard}, }; -use web_push::{PartialVapidSignatureBuilder, SubscriptionInfo, WebPushError}; +use web_push::{PartialVapidSignatureBuilder, SubscriptionInfo}; use crate::{error::failed::Failed, push::Publish}; @@ -15,7 +15,6 @@ pub struct Client(Arc>); #[derive(Default)] struct ClientInner { sent: Vec, - planned_failures: HashMap, } impl Client { @@ -33,35 +32,23 @@ impl Client { let sent = &mut self.inner().sent; mem::take(sent) } - - pub fn fail_next(&self, subscription_info: &SubscriptionInfo, err: WebPushError) { - let planned_failures = &mut self.inner().planned_failures; - planned_failures.insert(subscription_info.clone(), err); - } } #[async_trait::async_trait] impl Publish for Client { - async fn publish<'s, M>( + async fn publish( &self, message: M, _: &PartialVapidSignatureBuilder, - subscriptions: impl IntoIterator + Send, - ) -> Result, Failed> + subscriptions: impl IntoIterator + Send, + ) -> Result<(), Failed> where M: Send + 'static, { let mut inner = self.inner(); - let message: Box = Box::new(message); - let mut recipients = HashSet::new(); - let mut failures = Vec::new(); - for subscription in subscriptions { - recipients.insert(subscription.clone()); - if let Some(err) = inner.planned_failures.remove(subscription) { - failures.push((subscription, err)); - } - } + let message: Box = Box::new(message); + let recipients = subscriptions.into_iter().cloned().collect(); let publication = Publication { message, @@ -69,7 +56,7 @@ impl Publish for Client { }; inner.sent.push(publication); - Ok(failures) + Ok(()) } } diff --git a/ui/service-worker.js b/ui/service-worker.js index 81d0752..8a889ec 100644 --- a/ui/service-worker.js +++ b/ui/service-worker.js @@ -27,6 +27,7 @@ function countUnreadChannels() { self.addEventListener('push', (event) => { const data = event.data.json(); + if (data.type === 'heartbeat') { // Let's show a notification right away so Safari doesn't tell Apple to be // mad at us: -- cgit v1.2.3