use futures::future::join_all; use itertools::Itertools as _; use serde::Serialize; use sqlx::SqlitePool; use web_push::{ ContentEncoding, IsahcWebPushClient, PartialVapidSignatureBuilder, SubscriptionInfo, WebPushClient, WebPushError, WebPushMessage, WebPushMessageBuilder, }; use crate::{ db, error::failed::{Failed, ResultExt as _}, push::repo::Provider, }; #[async_trait::async_trait] pub trait Publish { async fn publish( &self, message: M, signer: &PartialVapidSignatureBuilder, subscriptions: impl IntoIterator + Send, ) -> Result<(), Failed> where M: Serialize + Send + 'static; } #[derive(Clone)] pub struct Publisher { db: SqlitePool, client: IsahcWebPushClient, } impl Publisher { pub fn new(db: SqlitePool) -> Result { let client = IsahcWebPushClient::new()?; Ok(Self { db, client }) } fn prepare_message( payload: &[u8], signer: &PartialVapidSignatureBuilder, subscription: &SubscriptionInfo, ) -> Result { let signature = signer .clone() .add_sub_info(subscription) .build() .fail("Failed to build VAPID signature")?; let mut message = WebPushMessageBuilder::new(subscription); message.set_payload(ContentEncoding::Aes128Gcm, payload); message.set_vapid_signature(signature); let message = message.build().fail("Failed to build push message")?; Ok(message) } async fn settle_failed( &self, failures: Vec<(&SubscriptionInfo, WebPushError)>, ) -> Result<(), Failed> { if !failures.is_empty() { let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?; // Note that data integrity guarantees from whatever transaction originally read the // subscriptions may no longer be valid now. Time has passed. Depending on how slow // delivering push notifications is, potentially a _lot_ of time has passed. for (sub, err) in &failures { match err { // I _think_ this is the complete set of permanent failures. See // for a complete // list. WebPushError::Unauthorized(_) | WebPushError::InvalidUri | WebPushError::EndpointNotValid(_) | WebPushError::EndpointNotFound(_) | WebPushError::InvalidCryptoKeys | WebPushError::MissingCryptoKeys => { tx.push() .unsubscribe(sub) .await .fail("Failed to unsubscribe after permanent push message rejection")?; } _ => (), } } tx.commit().await.fail(db::failed::COMMIT)?; } Ok(()) } } #[async_trait::async_trait] impl Publish for Publisher { async fn publish( &self, message: M, signer: &PartialVapidSignatureBuilder, subscriptions: impl IntoIterator + Send, ) -> Result<(), Failed> where M: Serialize + Send + 'static, { let payload = serde_json::to_vec_pretty(&message) .fail("Failed to encode web push message to JSON")?; let messages: Vec<_> = subscriptions .into_iter() .map(|sub| Self::prepare_message(&payload, signer, sub).map(|message| (sub, message))) .try_collect()?; let deliveries = messages .into_iter() .map(async |(sub, message)| (sub, self.client.send(message).await)); let failures: Vec<_> = join_all(deliveries) .await .into_iter() .filter_map(|(sub, result)| result.err().map(|err| (sub, err))) .collect(); self.settle_failed(failures).await?; Ok(()) } }