use p256::ecdsa::VerifyingKey; use sqlx::SqlitePool; use web_push::{SubscriptionInfo, WebPushError}; use super::repo::Provider as _; use crate::{ db, error::failed::{ErrorExt as _, Failed, ResultExt as _}, event::Heartbeat, login::Login, push::publisher::Publish, token::extract::Identity, vapid::repo::Provider as _, }; pub struct Push

{ db: SqlitePool, publisher: P, } impl

Push

{ pub const fn new(db: SqlitePool, publisher: P) -> Self { Self { db, publisher } } pub async fn subscribe( &self, subscriber: &Identity, subscription: &SubscriptionInfo, vapid: &VerifyingKey, ) -> Result<(), SubscribeError> { let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?; let current = tx .vapid() .current() .await .fail("Failed to load current VAPID key")?; if vapid != ¤t.key { return Err(SubscribeError::StaleVapidKey(current.key)); } match tx.push().create(&subscriber.token, subscription).await { Ok(()) => (), Err(err) => { if let Some(err) = err.as_database_error() && err.is_unique_violation() { let current = tx .push() .by_endpoint(&subscriber.login, &subscription.endpoint) .await .fail("Failed to load existing subscriptions for endpoint")?; // If we already have a subscription for this endpoint, with _different_ // parameters, then this is a client error. They shouldn't reuse endpoint URLs, // per the various RFCs. // // However, if we have a subscription for this endpoint with the same parameters // then we accept it and silently do nothing. This may happen if, for example, // the subscribe request is retried due to a network interruption where it's // not clear whether the original request succeeded. if ¤t != subscription { return Err(SubscribeError::Duplicate); } } else { return Err(err.fail("Failed to create push subscription")); } } } tx.commit().await.fail(db::failed::COMMIT)?; Ok(()) } } impl

Push

where P: Publish, { pub async fn ping(&self, recipient: &Login) -> Result<(), PushError> { let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?; let signer = tx .vapid() .signer() .await .fail("Failed to load current VAPID signer")?; let subscriptions = tx.push().by_login(recipient).await.fail_with(|| { format!( "Failed to find push subscriptions for login: {}", recipient.id ) })?; // We're about to perform some fairly IO-intensive outside actions. Holding a tx open // across them raises the risk that other clients will encounter errors due to locks from // this transaction, so release it here. We'll open a new transaction if there's something // we need to write. tx.commit().await.fail(db::failed::COMMIT)?; let failures = self .publisher .publish(Heartbeat::Heartbeat, &signer, &subscriptions) .await .fail("Failed to send push message")?; if !failures.is_empty() { let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?; // Note that data integrity guarantees from the original transaction to read // subscriptions may no longer be valid now. Time has passed. Depending on how slow // delivering push notifications is, potentially a _lot_ of time has passed. for (sub, err) in &failures { match err { // I _think_ this is the complete set of permanent failures. See // for a complete // list. WebPushError::Unauthorized(_) | WebPushError::InvalidUri | WebPushError::EndpointNotValid(_) | WebPushError::EndpointNotFound(_) | WebPushError::InvalidCryptoKeys | WebPushError::MissingCryptoKeys => { tx.push() .unsubscribe(sub) .await .fail("Failed to unsubscribe after permanent push message rejection")?; } _ => (), } } tx.commit().await.fail(db::failed::COMMIT)?; return Err(PushError::Delivery( failures.into_iter().map(|(_, err)| err).collect(), )); } Ok(()) } } #[derive(Debug, thiserror::Error)] pub enum SubscribeError { #[error("subscription created with stale VAPID key")] StaleVapidKey(VerifyingKey), #[error("subscription already exists for endpoint")] // The endpoint URL is not included in the error, as it is a bearer credential in its own right // and we want to limit its proliferation. The only intended recipient of this message is the // client, which already knows the endpoint anyways and doesn't need us to tell them. Duplicate, #[error(transparent)] Failed(#[from] Failed), } #[derive(Debug, thiserror::Error)] pub enum PushError { #[error("push message delivery failures: {0:?}")] Delivery(Vec), #[error(transparent)] Failed(#[from] Failed), }