summaryrefslogtreecommitdiff
path: root/src/push/app.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/push/app.rs')
-rw-r--r--src/push/app.rs111
1 files changed, 43 insertions, 68 deletions
diff --git a/src/push/app.rs b/src/push/app.rs
index 5e57800..f7846a6 100644
--- a/src/push/app.rs
+++ b/src/push/app.rs
@@ -1,30 +1,26 @@
-use futures::future::join_all;
-use itertools::Itertools as _;
use p256::ecdsa::VerifyingKey;
use sqlx::SqlitePool;
-use web_push::{
- ContentEncoding, PartialVapidSignatureBuilder, SubscriptionInfo, WebPushClient, WebPushError,
- WebPushMessage, WebPushMessageBuilder,
-};
+use web_push::{SubscriptionInfo, WebPushError};
use super::repo::Provider as _;
use crate::{
db,
- error::failed::{ErrorExt, Failed, ResultExt as _},
+ error::failed::{ErrorExt as _, Failed, ResultExt as _},
+ event::Heartbeat,
login::Login,
+ push::publisher::Publish,
token::extract::Identity,
- vapid,
vapid::repo::Provider as _,
};
pub struct Push<P> {
db: SqlitePool,
- webpush: P,
+ publisher: P,
}
impl<P> Push<P> {
- pub const fn new(db: SqlitePool, webpush: P) -> Self {
- Self { db, webpush }
+ pub const fn new(db: SqlitePool, publisher: P) -> Self {
+ Self { db, publisher }
}
pub async fn subscribe(
@@ -80,46 +76,41 @@ impl<P> Push<P> {
impl<P> Push<P>
where
- P: WebPushClient,
+ P: Publish,
{
- fn prepare_ping(
- signer: &PartialVapidSignatureBuilder,
- subscription: &SubscriptionInfo,
- ) -> Result<WebPushMessage, WebPushError> {
- let signature = signer.clone().add_sub_info(subscription).build()?;
-
- let payload = "ping".as_bytes();
-
- let mut message = WebPushMessageBuilder::new(subscription);
- message.set_payload(ContentEncoding::Aes128Gcm, payload);
- message.set_vapid_signature(signature);
- let message = message.build()?;
-
- Ok(message)
- }
-
pub async fn ping(&self, recipient: &Login) -> Result<(), PushError> {
- let mut tx = self.db.begin().await?;
-
- let signer = tx.vapid().signer().await?;
- let subscriptions = tx.push().by_login(recipient).await?;
-
- let pings: Vec<_> = subscriptions
- .into_iter()
- .map(|sub| Self::prepare_ping(&signer, &sub).map(|message| (sub, message)))
- .try_collect()?;
+ let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?;
- let deliveries = pings
- .into_iter()
- .map(async |(sub, message)| (sub, self.webpush.send(message).await));
+ let signer = tx
+ .vapid()
+ .signer()
+ .await
+ .fail("Failed to load current VAPID signer")?;
+ let subscriptions = tx.push().by_login(recipient).await.fail_with(|| {
+ format!(
+ "Failed to find push subscriptions for login: {}",
+ recipient.id
+ )
+ })?;
+
+ // We're about to perform some fairly IO-intensive outside actions. Holding a tx open
+ // across them raises the risk that other clients will encounter errors due to locks from
+ // this transaction, so release it here. We'll open a new transaction if there's something
+ // we need to write.
+ tx.commit().await.fail(db::failed::COMMIT)?;
- let failures: Vec<_> = join_all(deliveries)
+ let failures = self
+ .publisher
+ .publish(Heartbeat::Heartbeat, &signer, &subscriptions)
.await
- .into_iter()
- .filter_map(|(sub, result)| result.err().map(|err| (sub, err)))
- .collect();
+ .fail("Failed to send push message")?;
if !failures.is_empty() {
+ let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?;
+ // Note that data integrity guarantees from the original transaction to read
+ // subscriptions may no longer be valid now. Time has passed. Depending on how slow
+ // delivering push notifications is, potentially a _lot_ of time has passed.
+
for (sub, err) in &failures {
match err {
// I _think_ this is the complete set of permanent failures. See
@@ -131,19 +122,21 @@ where
| WebPushError::EndpointNotFound(_)
| WebPushError::InvalidCryptoKeys
| WebPushError::MissingCryptoKeys => {
- tx.push().unsubscribe(sub).await?;
+ tx.push()
+ .unsubscribe(sub)
+ .await
+ .fail("Failed to unsubscribe after permanent push message rejection")?;
}
_ => (),
}
}
+ tx.commit().await.fail(db::failed::COMMIT)?;
+
return Err(PushError::Delivery(
failures.into_iter().map(|(_, err)| err).collect(),
));
}
-
- tx.commit().await?;
-
Ok(())
}
}
@@ -163,26 +156,8 @@ pub enum SubscribeError {
#[derive(Debug, thiserror::Error)]
pub enum PushError {
- #[error(transparent)]
- Database(#[from] sqlx::Error),
- #[error(transparent)]
- Ecdsa(#[from] p256::ecdsa::Error),
- #[error(transparent)]
- Pkcs8(#[from] p256::pkcs8::Error),
- #[error(transparent)]
- WebPush(#[from] WebPushError),
#[error("push message delivery failures: {0:?}")]
Delivery(Vec<WebPushError>),
-}
-
-impl From<vapid::repo::Error> for PushError {
- fn from(error: vapid::repo::Error) -> Self {
- use vapid::repo::Error;
- match error {
- Error::Database(error) => error.into(),
- Error::Ecdsa(error) => error.into(),
- Error::Pkcs8(error) => error.into(),
- Error::WebPush(error) => error.into(),
- }
- }
+ #[error(transparent)]
+ Failed(#[from] Failed),
}