use p256::ecdsa::VerifyingKey; use sqlx::SqlitePool; use web_push::{SubscriptionInfo, WebPushError}; use super::repo::Provider as _; use crate::{ db, error::failed::{ErrorExt as _, Failed, ResultExt as _}, event::Heartbeat, login::Login, push::publisher::Publish, token::extract::Identity, vapid::repo::Provider as _, }; pub struct Push
{ db: SqlitePool, publisher: P, } impl
Push
{ pub const fn new(db: SqlitePool, publisher: P) -> Self { Self { db, publisher } } pub async fn subscribe( &self, subscriber: &Identity, subscription: &SubscriptionInfo, vapid: &VerifyingKey, ) -> Result<(), SubscribeError> { let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?; let current = tx .vapid() .current() .await .fail("Failed to load current VAPID key")?; if vapid != ¤t.key { return Err(SubscribeError::StaleVapidKey(current.key)); } match tx.push().create(&subscriber.token, subscription).await { Ok(()) => (), Err(err) => { if let Some(err) = err.as_database_error() && err.is_unique_violation() { let current = tx .push() .by_endpoint(&subscriber.login, &subscription.endpoint) .await .fail("Failed to load existing subscriptions for endpoint")?; // If we already have a subscription for this endpoint, with _different_ // parameters, then this is a client error. They shouldn't reuse endpoint URLs, // per the various RFCs. // // However, if we have a subscription for this endpoint with the same parameters // then we accept it and silently do nothing. This may happen if, for example, // the subscribe request is retried due to a network interruption where it's // not clear whether the original request succeeded. if ¤t != subscription { return Err(SubscribeError::Duplicate); } } else { return Err(err.fail("Failed to create push subscription")); } } } tx.commit().await.fail(db::failed::COMMIT)?; Ok(()) } } impl
Push
where
P: Publish,
{
pub async fn ping(&self, recipient: &Login) -> Result<(), PushError> {
let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?;
let signer = tx
.vapid()
.signer()
.await
.fail("Failed to load current VAPID signer")?;
let subscriptions = tx.push().by_login(recipient).await.fail_with(|| {
format!(
"Failed to find push subscriptions for login: {}",
recipient.id
)
})?;
// We're about to perform some fairly IO-intensive outside actions. Holding a tx open
// across them raises the risk that other clients will encounter errors due to locks from
// this transaction, so release it here. We'll open a new transaction if there's something
// we need to write.
tx.commit().await.fail(db::failed::COMMIT)?;
let failures = self
.publisher
.publish(Heartbeat::Heartbeat, &signer, &subscriptions)
.await
.fail("Failed to send push message")?;
if !failures.is_empty() {
let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?;
// Note that data integrity guarantees from the original transaction to read
// subscriptions may no longer be valid now. Time has passed. Depending on how slow
// delivering push notifications is, potentially a _lot_ of time has passed.
for (sub, err) in &failures {
match err {
// I _think_ this is the complete set of permanent failures. See
//