summaryrefslogtreecommitdiff
path: root/src/push/publisher.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/push/publisher.rs')
-rw-r--r--src/push/publisher.rs66
1 files changed, 55 insertions, 11 deletions
diff --git a/src/push/publisher.rs b/src/push/publisher.rs
index d6227a2..ef23f2f 100644
--- a/src/push/publisher.rs
+++ b/src/push/publisher.rs
@@ -1,34 +1,40 @@
use futures::future::join_all;
use itertools::Itertools as _;
use serde::Serialize;
+use sqlx::SqlitePool;
use web_push::{
ContentEncoding, IsahcWebPushClient, PartialVapidSignatureBuilder, SubscriptionInfo,
WebPushClient, WebPushError, WebPushMessage, WebPushMessageBuilder,
};
-use crate::error::failed::{Failed, ResultExt as _};
+use crate::{
+ db,
+ error::failed::{Failed, ResultExt as _},
+ push::repo::Provider,
+};
#[async_trait::async_trait]
pub trait Publish {
- async fn publish<'s, M>(
+ async fn publish<M>(
&self,
message: M,
signer: &PartialVapidSignatureBuilder,
- subscriptions: impl IntoIterator<Item = &'s SubscriptionInfo> + Send,
- ) -> Result<Vec<(&'s SubscriptionInfo, WebPushError)>, Failed>
+ subscriptions: impl IntoIterator<Item = &'_ SubscriptionInfo> + Send,
+ ) -> Result<(), Failed>
where
M: Serialize + Send + 'static;
}
#[derive(Clone)]
pub struct Publisher {
+ db: SqlitePool,
client: IsahcWebPushClient,
}
impl Publisher {
- pub fn new() -> Result<Self, WebPushError> {
+ pub fn new(db: SqlitePool) -> Result<Self, WebPushError> {
let client = IsahcWebPushClient::new()?;
- Ok(Self { client })
+ Ok(Self { db, client })
}
fn prepare_message(
@@ -49,16 +55,52 @@ impl Publisher {
Ok(message)
}
+
+ async fn settle_failed(
+ &self,
+ failures: Vec<(&SubscriptionInfo, WebPushError)>,
+ ) -> Result<(), Failed> {
+ if !failures.is_empty() {
+ let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?;
+ // Note that data integrity guarantees from whatever transaction originally read the
+ // subscriptions may no longer be valid now. Time has passed. Depending on how slow
+ // delivering push notifications is, potentially a _lot_ of time has passed.
+
+ for (sub, err) in &failures {
+ match err {
+ // I _think_ this is the complete set of permanent failures. See
+ // <https://docs.rs/web-push/latest/web_push/enum.WebPushError.html> for a complete
+ // list.
+ WebPushError::Unauthorized(_)
+ | WebPushError::InvalidUri
+ | WebPushError::EndpointNotValid(_)
+ | WebPushError::EndpointNotFound(_)
+ | WebPushError::InvalidCryptoKeys
+ | WebPushError::MissingCryptoKeys => {
+ tx.push()
+ .unsubscribe(sub)
+ .await
+ .fail("Failed to unsubscribe after permanent push message rejection")?;
+ }
+ _ => (),
+ }
+ }
+
+ tx.commit().await.fail(db::failed::COMMIT)?;
+ }
+
+ Ok(())
+ }
}
#[async_trait::async_trait]
impl Publish for Publisher {
- async fn publish<'s, M>(
+ async fn publish<M>(
&self,
message: M,
signer: &PartialVapidSignatureBuilder,
- subscriptions: impl IntoIterator<Item = &'s SubscriptionInfo> + Send,
- ) -> Result<Vec<(&'s SubscriptionInfo, WebPushError)>, Failed>
+ subscriptions: impl IntoIterator<Item = &'_ SubscriptionInfo> + Send,
+ ) -> Result<(), Failed>
where
M: Serialize + Send + 'static,
{
@@ -74,12 +116,14 @@ impl Publish for Publisher {
.into_iter()
.map(async |(sub, message)| (sub, self.client.send(message).await));
- let failures = join_all(deliveries)
+ let failures: Vec<_> = join_all(deliveries)
.await
.into_iter()
.filter_map(|(sub, result)| result.err().map(|err| (sub, err)))
.collect();
- Ok(failures)
+ self.settle_failed(failures).await?;
+
+ Ok(())
}
}