diff options
| author | Owen Jacobson <owen@grimoire.ca> | 2026-02-27 16:41:37 -0500 |
|---|---|---|
| committer | Owen Jacobson <owen@grimoire.ca> | 2026-02-27 18:15:33 -0500 |
| commit | b32c7682f0a84619a6d1845516a6a1829fa0c59b (patch) | |
| tree | fad8bf5bd8eb628253611f0b3d1a91febe3b9266 /src/push/publisher.rs | |
| parent | 6ab0f42250294e38e8da6a48260ff83544a6be9a (diff) | |
I want push publication to be "fire and forget," and ultimately also for it to be asynchronous and retriable. To facilitate that, the caller needs to be insulated from the final outcome of publishing a push message. I've opted to preserve the `Failure` possibility, but any delivery issues are now handled inside the publisher.
Diffstat (limited to 'src/push/publisher.rs')
| -rw-r--r-- | src/push/publisher.rs | 66 |
1 files changed, 55 insertions, 11 deletions
diff --git a/src/push/publisher.rs b/src/push/publisher.rs index d6227a2..ef23f2f 100644 --- a/src/push/publisher.rs +++ b/src/push/publisher.rs @@ -1,34 +1,40 @@ use futures::future::join_all; use itertools::Itertools as _; use serde::Serialize; +use sqlx::SqlitePool; use web_push::{ ContentEncoding, IsahcWebPushClient, PartialVapidSignatureBuilder, SubscriptionInfo, WebPushClient, WebPushError, WebPushMessage, WebPushMessageBuilder, }; -use crate::error::failed::{Failed, ResultExt as _}; +use crate::{ + db, + error::failed::{Failed, ResultExt as _}, + push::repo::Provider, +}; #[async_trait::async_trait] pub trait Publish { - async fn publish<'s, M>( + async fn publish<M>( &self, message: M, signer: &PartialVapidSignatureBuilder, - subscriptions: impl IntoIterator<Item = &'s SubscriptionInfo> + Send, - ) -> Result<Vec<(&'s SubscriptionInfo, WebPushError)>, Failed> + subscriptions: impl IntoIterator<Item = &'_ SubscriptionInfo> + Send, + ) -> Result<(), Failed> where M: Serialize + Send + 'static; } #[derive(Clone)] pub struct Publisher { + db: SqlitePool, client: IsahcWebPushClient, } impl Publisher { - pub fn new() -> Result<Self, WebPushError> { + pub fn new(db: SqlitePool) -> Result<Self, WebPushError> { let client = IsahcWebPushClient::new()?; - Ok(Self { client }) + Ok(Self { db, client }) } fn prepare_message( @@ -49,16 +55,52 @@ impl Publisher { Ok(message) } + + async fn settle_failed( + &self, + failures: Vec<(&SubscriptionInfo, WebPushError)>, + ) -> Result<(), Failed> { + if !failures.is_empty() { + let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?; + // Note that data integrity guarantees from whatever transaction originally read the + // subscriptions may no longer be valid now. Time has passed. Depending on how slow + // delivering push notifications is, potentially a _lot_ of time has passed. + + for (sub, err) in &failures { + match err { + // I _think_ this is the complete set of permanent failures. See + // <https://docs.rs/web-push/latest/web_push/enum.WebPushError.html> for a complete + // list. + WebPushError::Unauthorized(_) + | WebPushError::InvalidUri + | WebPushError::EndpointNotValid(_) + | WebPushError::EndpointNotFound(_) + | WebPushError::InvalidCryptoKeys + | WebPushError::MissingCryptoKeys => { + tx.push() + .unsubscribe(sub) + .await + .fail("Failed to unsubscribe after permanent push message rejection")?; + } + _ => (), + } + } + + tx.commit().await.fail(db::failed::COMMIT)?; + } + + Ok(()) + } } #[async_trait::async_trait] impl Publish for Publisher { - async fn publish<'s, M>( + async fn publish<M>( &self, message: M, signer: &PartialVapidSignatureBuilder, - subscriptions: impl IntoIterator<Item = &'s SubscriptionInfo> + Send, - ) -> Result<Vec<(&'s SubscriptionInfo, WebPushError)>, Failed> + subscriptions: impl IntoIterator<Item = &'_ SubscriptionInfo> + Send, + ) -> Result<(), Failed> where M: Serialize + Send + 'static, { @@ -74,12 +116,14 @@ impl Publish for Publisher { .into_iter() .map(async |(sub, message)| (sub, self.client.send(message).await)); - let failures = join_all(deliveries) + let failures: Vec<_> = join_all(deliveries) .await .into_iter() .filter_map(|(sub, result)| result.err().map(|err| (sub, err))) .collect(); - Ok(failures) + self.settle_failed(failures).await?; + + Ok(()) } } |
