summaryrefslogtreecommitdiff
path: root/src/push/app.rs
diff options
context:
space:
mode:
authorOwen Jacobson <owen@grimoire.ca>2025-12-09 15:13:21 -0500
committerOwen Jacobson <owen@grimoire.ca>2025-12-17 15:48:20 -0500
commit3c697f5fb1b8dbad46eac8fa299ed7cebfb36159 (patch)
treeb7854fb23d1e104f928acfe3bba75ea3b74b83d9 /src/push/app.rs
parent41a5a0f7e13bf5a82aaef59e34eb68f0fe7fa7f5 (diff)
Factor push message publication out to its own helper component.
The `Publisher` component handles the details of web push delivery. Callers must provide the subscription set, the current signer, and the message, while the publisher handles encoding and communication with web push endpoints. To facilitate testing, `Publisher` implements `Publish`, which is a new trait with the same interface. Components that might publish web push messages should rely on the trait where possible. The test suite now constructs an app with a dummy `Publish` impl, which captures push messages for examination. Note that the testing implementation of `Publish` is hand-crafted, and presently only acts to record the arguments it receives. The other alternative was to use a mocking library, such as `mockit`, and while I've used that approach before, I'm not super comfortable with the complexity in this situation. I think we can maintain a more reasonable testing `Publish` impl by hand, at least for now, and we can revisit that decision later if need be. Tests for the `ping` endpoint have been migrated to this endpoint.
Diffstat (limited to 'src/push/app.rs')
-rw-r--r--src/push/app.rs68
1 files changed, 23 insertions, 45 deletions
diff --git a/src/push/app.rs b/src/push/app.rs
index 2bd6c25..ebfc220 100644
--- a/src/push/app.rs
+++ b/src/push/app.rs
@@ -1,29 +1,26 @@
-use futures::future::join_all;
-use itertools::Itertools as _;
use p256::ecdsa::VerifyingKey;
use sqlx::SqlitePool;
-use web_push::{
- ContentEncoding, PartialVapidSignatureBuilder, SubscriptionInfo, WebPushClient, WebPushError,
- WebPushMessage, WebPushMessageBuilder,
-};
+use web_push::{SubscriptionInfo, WebPushError};
use super::repo::Provider as _;
use crate::{
db,
error::failed::{ErrorExt as _, Failed, ResultExt as _},
+ event::Heartbeat,
login::Login,
+ push::publisher::Publish,
token::extract::Identity,
vapid::repo::Provider as _,
};
pub struct Push<P> {
db: SqlitePool,
- webpush: P,
+ publisher: P,
}
impl<P> Push<P> {
- pub const fn new(db: SqlitePool, webpush: P) -> Self {
- Self { db, webpush }
+ pub const fn new(db: SqlitePool, publisher: P) -> Self {
+ Self { db, publisher }
}
pub async fn subscribe(
@@ -79,28 +76,8 @@ impl<P> Push<P> {
impl<P> Push<P>
where
- P: WebPushClient,
+ P: Publish,
{
- fn prepare_ping(
- signer: &PartialVapidSignatureBuilder,
- subscription: &SubscriptionInfo,
- ) -> Result<WebPushMessage, PushError> {
- let signature = signer
- .clone()
- .add_sub_info(subscription)
- .build()
- .fail("Failed to build VAPID signature")?;
-
- let payload = "ping".as_bytes();
-
- let mut message = WebPushMessageBuilder::new(subscription);
- message.set_payload(ContentEncoding::Aes128Gcm, payload);
- message.set_vapid_signature(signature);
- let message = message.build().fail("Failed to build push message")?;
-
- Ok(message)
- }
-
pub async fn ping(&self, recipient: &Login) -> Result<(), PushError> {
let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?;
@@ -116,22 +93,24 @@ where
)
})?;
- let pings: Vec<_> = subscriptions
- .into_iter()
- .map(|sub| Self::prepare_ping(&signer, &sub).map(|message| (sub, message)))
- .try_collect()?;
-
- let deliveries = pings
- .into_iter()
- .map(async |(sub, message)| (sub, self.webpush.send(message).await));
+ // We're about to perform some fairly IO-intensive outside actions. Holding a tx open
+ // across them raises the risk that other clients will encounter errors due to locks from
+ // this transaction, so release it here. We'll open a new transaction if there's something
+ // we need to write.
+ tx.commit().await.fail(db::failed::COMMIT)?;
- let failures: Vec<_> = join_all(deliveries)
+ let failures = self
+ .publisher
+ .publish(Heartbeat::Heartbeat, signer, subscriptions)
.await
- .into_iter()
- .filter_map(|(sub, result)| result.err().map(|err| (sub, err)))
- .collect();
+ .fail("Failed to send push message")?;
if !failures.is_empty() {
+ let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?;
+ // Note that data integrity guarantees from the original transaction to read
+ // subscriptions may no longer be valid now. Time has passed. Depending on how slow
+ // delivering push notifications is, potentially a _lot_ of time has passed.
+
for (sub, err) in &failures {
match err {
// I _think_ this is the complete set of permanent failures. See
@@ -152,13 +131,12 @@ where
}
}
+ tx.commit().await.fail(db::failed::COMMIT)?;
+
return Err(PushError::Delivery(
failures.into_iter().map(|(_, err)| err).collect(),
));
}
-
- tx.commit().await.fail(db::failed::COMMIT)?;
-
Ok(())
}
}