diff options
Diffstat (limited to 'src/push/publisher.rs')
| -rw-r--r-- | src/push/publisher.rs | 66 |
1 files changed, 55 insertions, 11 deletions
diff --git a/src/push/publisher.rs b/src/push/publisher.rs index d6227a2..ef23f2f 100644 --- a/src/push/publisher.rs +++ b/src/push/publisher.rs @@ -1,34 +1,40 @@ use futures::future::join_all; use itertools::Itertools as _; use serde::Serialize; +use sqlx::SqlitePool; use web_push::{ ContentEncoding, IsahcWebPushClient, PartialVapidSignatureBuilder, SubscriptionInfo, WebPushClient, WebPushError, WebPushMessage, WebPushMessageBuilder, }; -use crate::error::failed::{Failed, ResultExt as _}; +use crate::{ + db, + error::failed::{Failed, ResultExt as _}, + push::repo::Provider, +}; #[async_trait::async_trait] pub trait Publish { - async fn publish<'s, M>( + async fn publish<M>( &self, message: M, signer: &PartialVapidSignatureBuilder, - subscriptions: impl IntoIterator<Item = &'s SubscriptionInfo> + Send, - ) -> Result<Vec<(&'s SubscriptionInfo, WebPushError)>, Failed> + subscriptions: impl IntoIterator<Item = &'_ SubscriptionInfo> + Send, + ) -> Result<(), Failed> where M: Serialize + Send + 'static; } #[derive(Clone)] pub struct Publisher { + db: SqlitePool, client: IsahcWebPushClient, } impl Publisher { - pub fn new() -> Result<Self, WebPushError> { + pub fn new(db: SqlitePool) -> Result<Self, WebPushError> { let client = IsahcWebPushClient::new()?; - Ok(Self { client }) + Ok(Self { db, client }) } fn prepare_message( @@ -49,16 +55,52 @@ impl Publisher { Ok(message) } + + async fn settle_failed( + &self, + failures: Vec<(&SubscriptionInfo, WebPushError)>, + ) -> Result<(), Failed> { + if !failures.is_empty() { + let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?; + // Note that data integrity guarantees from whatever transaction originally read the + // subscriptions may no longer be valid now. Time has passed. Depending on how slow + // delivering push notifications is, potentially a _lot_ of time has passed. + + for (sub, err) in &failures { + match err { + // I _think_ this is the complete set of permanent failures. See + // <https://docs.rs/web-push/latest/web_push/enum.WebPushError.html> for a complete + // list. + WebPushError::Unauthorized(_) + | WebPushError::InvalidUri + | WebPushError::EndpointNotValid(_) + | WebPushError::EndpointNotFound(_) + | WebPushError::InvalidCryptoKeys + | WebPushError::MissingCryptoKeys => { + tx.push() + .unsubscribe(sub) + .await + .fail("Failed to unsubscribe after permanent push message rejection")?; + } + _ => (), + } + } + + tx.commit().await.fail(db::failed::COMMIT)?; + } + + Ok(()) + } } #[async_trait::async_trait] impl Publish for Publisher { - async fn publish<'s, M>( + async fn publish<M>( &self, message: M, signer: &PartialVapidSignatureBuilder, - subscriptions: impl IntoIterator<Item = &'s SubscriptionInfo> + Send, - ) -> Result<Vec<(&'s SubscriptionInfo, WebPushError)>, Failed> + subscriptions: impl IntoIterator<Item = &'_ SubscriptionInfo> + Send, + ) -> Result<(), Failed> where M: Serialize + Send + 'static, { @@ -74,12 +116,14 @@ impl Publish for Publisher { .into_iter() .map(async |(sub, message)| (sub, self.client.send(message).await)); - let failures = join_all(deliveries) + let failures: Vec<_> = join_all(deliveries) .await .into_iter() .filter_map(|(sub, result)| result.err().map(|err| (sub, err))) .collect(); - Ok(failures) + self.settle_failed(failures).await?; + + Ok(()) } } |
