diff options
Diffstat (limited to 'src/push/app.rs')
| -rw-r--r-- | src/push/app.rs | 111 |
1 files changed, 43 insertions, 68 deletions
diff --git a/src/push/app.rs b/src/push/app.rs index 5e57800..f7846a6 100644 --- a/src/push/app.rs +++ b/src/push/app.rs @@ -1,30 +1,26 @@ -use futures::future::join_all; -use itertools::Itertools as _; use p256::ecdsa::VerifyingKey; use sqlx::SqlitePool; -use web_push::{ - ContentEncoding, PartialVapidSignatureBuilder, SubscriptionInfo, WebPushClient, WebPushError, - WebPushMessage, WebPushMessageBuilder, -}; +use web_push::{SubscriptionInfo, WebPushError}; use super::repo::Provider as _; use crate::{ db, - error::failed::{ErrorExt, Failed, ResultExt as _}, + error::failed::{ErrorExt as _, Failed, ResultExt as _}, + event::Heartbeat, login::Login, + push::publisher::Publish, token::extract::Identity, - vapid, vapid::repo::Provider as _, }; pub struct Push<P> { db: SqlitePool, - webpush: P, + publisher: P, } impl<P> Push<P> { - pub const fn new(db: SqlitePool, webpush: P) -> Self { - Self { db, webpush } + pub const fn new(db: SqlitePool, publisher: P) -> Self { + Self { db, publisher } } pub async fn subscribe( @@ -80,46 +76,41 @@ impl<P> Push<P> { impl<P> Push<P> where - P: WebPushClient, + P: Publish, { - fn prepare_ping( - signer: &PartialVapidSignatureBuilder, - subscription: &SubscriptionInfo, - ) -> Result<WebPushMessage, WebPushError> { - let signature = signer.clone().add_sub_info(subscription).build()?; - - let payload = "ping".as_bytes(); - - let mut message = WebPushMessageBuilder::new(subscription); - message.set_payload(ContentEncoding::Aes128Gcm, payload); - message.set_vapid_signature(signature); - let message = message.build()?; - - Ok(message) - } - pub async fn ping(&self, recipient: &Login) -> Result<(), PushError> { - let mut tx = self.db.begin().await?; - - let signer = tx.vapid().signer().await?; - let subscriptions = tx.push().by_login(recipient).await?; - - let pings: Vec<_> = subscriptions - .into_iter() - .map(|sub| Self::prepare_ping(&signer, &sub).map(|message| (sub, message))) - .try_collect()?; + let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?; - let deliveries = pings - .into_iter() - .map(async |(sub, message)| (sub, self.webpush.send(message).await)); + let signer = tx + .vapid() + .signer() + .await + .fail("Failed to load current VAPID signer")?; + let subscriptions = tx.push().by_login(recipient).await.fail_with(|| { + format!( + "Failed to find push subscriptions for login: {}", + recipient.id + ) + })?; + + // We're about to perform some fairly IO-intensive outside actions. Holding a tx open + // across them raises the risk that other clients will encounter errors due to locks from + // this transaction, so release it here. We'll open a new transaction if there's something + // we need to write. + tx.commit().await.fail(db::failed::COMMIT)?; - let failures: Vec<_> = join_all(deliveries) + let failures = self + .publisher + .publish(Heartbeat::Heartbeat, &signer, &subscriptions) .await - .into_iter() - .filter_map(|(sub, result)| result.err().map(|err| (sub, err))) - .collect(); + .fail("Failed to send push message")?; if !failures.is_empty() { + let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?; + // Note that data integrity guarantees from the original transaction to read + // subscriptions may no longer be valid now. Time has passed. Depending on how slow + // delivering push notifications is, potentially a _lot_ of time has passed. + for (sub, err) in &failures { match err { // I _think_ this is the complete set of permanent failures. See @@ -131,19 +122,21 @@ where | WebPushError::EndpointNotFound(_) | WebPushError::InvalidCryptoKeys | WebPushError::MissingCryptoKeys => { - tx.push().unsubscribe(sub).await?; + tx.push() + .unsubscribe(sub) + .await + .fail("Failed to unsubscribe after permanent push message rejection")?; } _ => (), } } + tx.commit().await.fail(db::failed::COMMIT)?; + return Err(PushError::Delivery( failures.into_iter().map(|(_, err)| err).collect(), )); } - - tx.commit().await?; - Ok(()) } } @@ -163,26 +156,8 @@ pub enum SubscribeError { #[derive(Debug, thiserror::Error)] pub enum PushError { - #[error(transparent)] - Database(#[from] sqlx::Error), - #[error(transparent)] - Ecdsa(#[from] p256::ecdsa::Error), - #[error(transparent)] - Pkcs8(#[from] p256::pkcs8::Error), - #[error(transparent)] - WebPush(#[from] WebPushError), #[error("push message delivery failures: {0:?}")] Delivery(Vec<WebPushError>), -} - -impl From<vapid::repo::Error> for PushError { - fn from(error: vapid::repo::Error) -> Self { - use vapid::repo::Error; - match error { - Error::Database(error) => error.into(), - Error::Ecdsa(error) => error.into(), - Error::Pkcs8(error) => error.into(), - Error::WebPush(error) => error.into(), - } - } + #[error(transparent)] + Failed(#[from] Failed), } |
