summaryrefslogtreecommitdiff
path: root/src/push/publisher.rs
diff options
context:
space:
mode:
authorOwen Jacobson <owen@grimoire.ca>2026-02-27 16:41:37 -0500
committerOwen Jacobson <owen@grimoire.ca>2026-02-27 18:15:33 -0500
commitb32c7682f0a84619a6d1845516a6a1829fa0c59b (patch)
treefad8bf5bd8eb628253611f0b3d1a91febe3b9266 /src/push/publisher.rs
parent6ab0f42250294e38e8da6a48260ff83544a6be9a (diff)
Move failed push handling inside of the web push publisher.HEADmain
I want push publication to be "fire and forget," and ultimately also for it to be asynchronous and retriable. To facilitate that, the caller needs to be insulated from the final outcome of publishing a push message. I've opted to preserve the `Failure` possibility, but any delivery issues are now handled inside the publisher.
Diffstat (limited to 'src/push/publisher.rs')
-rw-r--r--src/push/publisher.rs66
1 files changed, 55 insertions, 11 deletions
diff --git a/src/push/publisher.rs b/src/push/publisher.rs
index d6227a2..ef23f2f 100644
--- a/src/push/publisher.rs
+++ b/src/push/publisher.rs
@@ -1,34 +1,40 @@
use futures::future::join_all;
use itertools::Itertools as _;
use serde::Serialize;
+use sqlx::SqlitePool;
use web_push::{
ContentEncoding, IsahcWebPushClient, PartialVapidSignatureBuilder, SubscriptionInfo,
WebPushClient, WebPushError, WebPushMessage, WebPushMessageBuilder,
};
-use crate::error::failed::{Failed, ResultExt as _};
+use crate::{
+ db,
+ error::failed::{Failed, ResultExt as _},
+ push::repo::Provider,
+};
#[async_trait::async_trait]
pub trait Publish {
- async fn publish<'s, M>(
+ async fn publish<M>(
&self,
message: M,
signer: &PartialVapidSignatureBuilder,
- subscriptions: impl IntoIterator<Item = &'s SubscriptionInfo> + Send,
- ) -> Result<Vec<(&'s SubscriptionInfo, WebPushError)>, Failed>
+ subscriptions: impl IntoIterator<Item = &'_ SubscriptionInfo> + Send,
+ ) -> Result<(), Failed>
where
M: Serialize + Send + 'static;
}
#[derive(Clone)]
pub struct Publisher {
+ db: SqlitePool,
client: IsahcWebPushClient,
}
impl Publisher {
- pub fn new() -> Result<Self, WebPushError> {
+ pub fn new(db: SqlitePool) -> Result<Self, WebPushError> {
let client = IsahcWebPushClient::new()?;
- Ok(Self { client })
+ Ok(Self { db, client })
}
fn prepare_message(
@@ -49,16 +55,52 @@ impl Publisher {
Ok(message)
}
+
+ async fn settle_failed(
+ &self,
+ failures: Vec<(&SubscriptionInfo, WebPushError)>,
+ ) -> Result<(), Failed> {
+ if !failures.is_empty() {
+ let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?;
+ // Note that data integrity guarantees from whatever transaction originally read the
+ // subscriptions may no longer be valid now. Time has passed. Depending on how slow
+ // delivering push notifications is, potentially a _lot_ of time has passed.
+
+ for (sub, err) in &failures {
+ match err {
+ // I _think_ this is the complete set of permanent failures. See
+ // <https://docs.rs/web-push/latest/web_push/enum.WebPushError.html> for a complete
+ // list.
+ WebPushError::Unauthorized(_)
+ | WebPushError::InvalidUri
+ | WebPushError::EndpointNotValid(_)
+ | WebPushError::EndpointNotFound(_)
+ | WebPushError::InvalidCryptoKeys
+ | WebPushError::MissingCryptoKeys => {
+ tx.push()
+ .unsubscribe(sub)
+ .await
+ .fail("Failed to unsubscribe after permanent push message rejection")?;
+ }
+ _ => (),
+ }
+ }
+
+ tx.commit().await.fail(db::failed::COMMIT)?;
+ }
+
+ Ok(())
+ }
}
#[async_trait::async_trait]
impl Publish for Publisher {
- async fn publish<'s, M>(
+ async fn publish<M>(
&self,
message: M,
signer: &PartialVapidSignatureBuilder,
- subscriptions: impl IntoIterator<Item = &'s SubscriptionInfo> + Send,
- ) -> Result<Vec<(&'s SubscriptionInfo, WebPushError)>, Failed>
+ subscriptions: impl IntoIterator<Item = &'_ SubscriptionInfo> + Send,
+ ) -> Result<(), Failed>
where
M: Serialize + Send + 'static,
{
@@ -74,12 +116,14 @@ impl Publish for Publisher {
.into_iter()
.map(async |(sub, message)| (sub, self.client.send(message).await));
- let failures = join_all(deliveries)
+ let failures: Vec<_> = join_all(deliveries)
.await
.into_iter()
.filter_map(|(sub, result)| result.err().map(|err| (sub, err)))
.collect();
- Ok(failures)
+ self.settle_failed(failures).await?;
+
+ Ok(())
}
}