use p256::ecdsa::VerifyingKey; use sqlx::SqlitePool; use web_push::SubscriptionInfo; use super::repo::Provider as _; use crate::{ db, error::failed::{ErrorExt as _, Failed, ResultExt as _}, event::Heartbeat, login::Login, push::publisher::Publish, token::extract::Identity, vapid::repo::Provider as _, }; pub struct Push

{ db: SqlitePool, publisher: P, } impl

Push

{ pub const fn new(db: SqlitePool, publisher: P) -> Self { Self { db, publisher } } pub async fn subscribe( &self, subscriber: &Identity, subscription: &SubscriptionInfo, vapid: &VerifyingKey, ) -> Result<(), SubscribeError> { let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?; let current = tx .vapid() .current() .await .fail("Failed to load current VAPID key")?; if vapid != ¤t.key { return Err(SubscribeError::StaleVapidKey(current.key)); } match tx.push().create(&subscriber.token, subscription).await { Ok(()) => (), Err(err) => { if let Some(err) = err.as_database_error() && err.is_unique_violation() { let current = tx .push() .by_endpoint(&subscriber.login, &subscription.endpoint) .await .fail("Failed to load existing subscriptions for endpoint")?; // If we already have a subscription for this endpoint, with _different_ // parameters, then this is a client error. They shouldn't reuse endpoint URLs, // per the various RFCs. // // However, if we have a subscription for this endpoint with the same parameters // then we accept it and silently do nothing. This may happen if, for example, // the subscribe request is retried due to a network interruption where it's // not clear whether the original request succeeded. if ¤t != subscription { return Err(SubscribeError::Duplicate); } } else { return Err(err.fail("Failed to create push subscription")); } } } tx.commit().await.fail(db::failed::COMMIT)?; Ok(()) } } impl

Push

where P: Publish, { pub async fn ping(&self, recipient: &Login) -> Result<(), Failed> { let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?; let signer = tx .vapid() .signer() .await .fail("Failed to load current VAPID signer")?; let subscriptions = tx.push().by_login(recipient).await.fail_with(|| { format!( "Failed to find push subscriptions for login: {}", recipient.id ) })?; // We're about to perform some fairly IO-intensive outside actions. Holding a tx open // across them raises the risk that other clients will encounter errors due to locks from // this transaction, so release it here. We'll open a new transaction if there's something // we need to write. tx.commit().await.fail(db::failed::COMMIT)?; self.publisher .publish(Heartbeat::Heartbeat, &signer, &subscriptions) .await .fail("Failed to send push message")?; Ok(()) } } #[derive(Debug, thiserror::Error)] pub enum SubscribeError { #[error("subscription created with stale VAPID key")] StaleVapidKey(VerifyingKey), #[error("subscription already exists for endpoint")] // The endpoint URL is not included in the error, as it is a bearer credential in its own right // and we want to limit its proliferation. The only intended recipient of this message is the // client, which already knows the endpoint anyways and doesn't need us to tell them. Duplicate, #[error(transparent)] Failed(#[from] Failed), }