summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/cli.rs2
-rw-r--r--src/message/app.rs32
-rw-r--r--src/push/app.rs48
-rw-r--r--src/push/handlers/ping/test.rs200
-rw-r--r--src/push/publisher.rs66
-rw-r--r--src/test/webpush.rs29
-rw-r--r--ui/service-worker.js1
7 files changed, 69 insertions, 309 deletions
diff --git a/src/cli.rs b/src/cli.rs
index 971d1f9..e0d8b75 100644
--- a/src/cli.rs
+++ b/src/cli.rs
@@ -101,7 +101,7 @@ impl Args {
self.umask.set();
let pool = self.pool().await.fail("Failed to create database pool")?;
- let publisher = Publisher::new().fail("Failed to create web push publisher")?;
+ let publisher = Publisher::new(pool.clone()).fail("Failed to create web push publisher")?;
let app = App::from(pool, publisher);
match self.command {
diff --git a/src/message/app.rs b/src/message/app.rs
index b79dad0..4bf96bf 100644
--- a/src/message/app.rs
+++ b/src/message/app.rs
@@ -1,7 +1,6 @@
use chrono::TimeDelta;
use itertools::Itertools;
use sqlx::sqlite::SqlitePool;
-use web_push::WebPushError;
use super::{Body, History, Id, Message, history, repo::Provider as _};
use crate::{
@@ -184,39 +183,10 @@ where
self.events.broadcast_from(events.clone());
for event in events {
- let failures = self
- .publisher
+ self.publisher
.publish(Event::from(event), &signer, &push_recipients)
.await
.fail("Failed to publish push events")?;
-
- if !failures.is_empty() {
- let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?;
- // Note that data integrity guarantees from the original transaction to read
- // subscriptions may no longer be valid now. Time has passed. Depending on how slow
- // delivering push notifications is, potentially a _lot_ of time has passed.
-
- for (sub, err) in failures {
- match err {
- // I _think_ this is the complete set of permanent failures. See
- // <https://docs.rs/web-push/latest/web_push/enum.WebPushError.html> for a complete
- // list.
- WebPushError::Unauthorized(_)
- | WebPushError::InvalidUri
- | WebPushError::EndpointNotValid(_)
- | WebPushError::EndpointNotFound(_)
- | WebPushError::InvalidCryptoKeys
- | WebPushError::MissingCryptoKeys => {
- tx.push().unsubscribe(sub).await.fail(
- "Failed to unsubscribe after permanent push message rejection",
- )?;
- }
- _ => (),
- }
- }
-
- tx.commit().await.fail(db::failed::COMMIT)?;
- }
}
Ok(message.as_sent())
diff --git a/src/push/app.rs b/src/push/app.rs
index f7846a6..1983055 100644
--- a/src/push/app.rs
+++ b/src/push/app.rs
@@ -1,6 +1,6 @@
use p256::ecdsa::VerifyingKey;
use sqlx::SqlitePool;
-use web_push::{SubscriptionInfo, WebPushError};
+use web_push::SubscriptionInfo;
use super::repo::Provider as _;
use crate::{
@@ -78,7 +78,7 @@ impl<P> Push<P>
where
P: Publish,
{
- pub async fn ping(&self, recipient: &Login) -> Result<(), PushError> {
+ pub async fn ping(&self, recipient: &Login) -> Result<(), Failed> {
let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?;
let signer = tx
@@ -99,44 +99,10 @@ where
// we need to write.
tx.commit().await.fail(db::failed::COMMIT)?;
- let failures = self
- .publisher
+ self.publisher
.publish(Heartbeat::Heartbeat, &signer, &subscriptions)
.await
.fail("Failed to send push message")?;
-
- if !failures.is_empty() {
- let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?;
- // Note that data integrity guarantees from the original transaction to read
- // subscriptions may no longer be valid now. Time has passed. Depending on how slow
- // delivering push notifications is, potentially a _lot_ of time has passed.
-
- for (sub, err) in &failures {
- match err {
- // I _think_ this is the complete set of permanent failures. See
- // <https://docs.rs/web-push/latest/web_push/enum.WebPushError.html> for a complete
- // list.
- WebPushError::Unauthorized(_)
- | WebPushError::InvalidUri
- | WebPushError::EndpointNotValid(_)
- | WebPushError::EndpointNotFound(_)
- | WebPushError::InvalidCryptoKeys
- | WebPushError::MissingCryptoKeys => {
- tx.push()
- .unsubscribe(sub)
- .await
- .fail("Failed to unsubscribe after permanent push message rejection")?;
- }
- _ => (),
- }
- }
-
- tx.commit().await.fail(db::failed::COMMIT)?;
-
- return Err(PushError::Delivery(
- failures.into_iter().map(|(_, err)| err).collect(),
- ));
- }
Ok(())
}
}
@@ -153,11 +119,3 @@ pub enum SubscribeError {
#[error(transparent)]
Failed(#[from] Failed),
}
-
-#[derive(Debug, thiserror::Error)]
-pub enum PushError {
- #[error("push message delivery failures: {0:?}")]
- Delivery(Vec<WebPushError>),
- #[error(transparent)]
- Failed(#[from] Failed),
-}
diff --git a/src/push/handlers/ping/test.rs b/src/push/handlers/ping/test.rs
index cc07ef0..70ba4bf 100644
--- a/src/push/handlers/ping/test.rs
+++ b/src/push/handlers/ping/test.rs
@@ -172,203 +172,3 @@ async fn ping_recipient_only() {
.any(|publish| publish.recipients.contains(&spectator_subscription))
);
}
-
-#[tokio::test]
-async fn ping_permanent_error() {
- let app = fixtures::scratch_app().await;
-
- let recipient = fixtures::identity::create(&app, &fixtures::now()).await;
-
- let vapid = fixtures::vapid::key(&app).await;
-
- // Create subscriptions
- let subscription = SubscriptionInfo::new(
- "https://push.example.com/recipient/endpoint",
- "recipient-p256dh-key",
- "recipient-auth",
- );
- app.push()
- .subscribe(&recipient, &subscription, &vapid)
- .await
- .expect("creating a subscription succeeds");
-
- // Prepare the next ping attempt to fail
- app.publisher()
- .fail_next(&subscription, WebPushError::InvalidUri);
-
- // Send a ping
- super::handler(
- State(app.push()),
- recipient.clone(),
- Json(super::Request {}),
- )
- .await
- .expect_err("sending a ping with a permanently-failing subscription fails");
-
- // Confirm that it was actually sent
- let sent = app.publisher().sent();
-
- let subscriptions = HashSet::from([subscription.clone()]);
- assert!(
- sent.iter()
- .filter(|publish| publish.message_eq(&Heartbeat::Heartbeat)
- && publish.recipients == subscriptions)
- .exactly_one()
- .is_ok()
- );
-
- // Send a second ping
- let response = super::handler(State(app.push()), recipient, Json(super::Request {}))
- .await
- .expect("sending a ping with no subscriptions always succeeds");
-
- assert_eq!(StatusCode::ACCEPTED, response);
-
- // Confirm that it was _not_ sent this time, since it failed permanently last time
- let sent = app.publisher().sent();
-
- assert!(
- !sent
- .iter()
- .any(|publish| publish.recipients.contains(&subscription))
- );
-}
-
-#[tokio::test]
-async fn ping_temporary_error() {
- let app = fixtures::scratch_app().await;
-
- let recipient = fixtures::identity::create(&app, &fixtures::now()).await;
-
- let vapid = fixtures::vapid::key(&app).await;
-
- // Create subscriptions
- let subscription = SubscriptionInfo::new(
- "https://push.example.com/recipient/endpoint",
- "recipient-p256dh-key",
- "recipient-auth",
- );
- app.push()
- .subscribe(&recipient, &subscription, &vapid)
- .await
- .expect("creating a subscription succeeds");
-
- // Prepare the next ping attempt to fail
- app.publisher().fail_next(
- &subscription,
- WebPushError::from(io::Error::other("transient IO error")),
- );
-
- // Send a ping
- super::handler(
- State(app.push()),
- recipient.clone(),
- Json(super::Request {}),
- )
- .await
- .expect_err("sending a ping with a temporarily-failing subscription fails");
-
- // Confirm that it was actually sent
- let sent = app.publisher().sent();
-
- let subscriptions = HashSet::from([subscription.clone()]);
- assert!(
- sent.iter()
- .filter(|publish| publish.message_eq(&Heartbeat::Heartbeat)
- && publish.recipients == subscriptions)
- .exactly_one()
- .is_ok()
- );
-
- // Send a second ping
- let response = super::handler(State(app.push()), recipient, Json(super::Request {}))
- .await
- .expect("sending a ping subscription succeeds now that the transient error has cleared");
- assert_eq!(StatusCode::ACCEPTED, response);
-
- // Confirm that it was _not_ sent this time, since it failed permanently last time
- let sent = app.publisher().sent();
-
- let subscriptions = HashSet::from([subscription]);
- assert!(
- sent.iter()
- .filter(|publish| publish.message_eq(&Heartbeat::Heartbeat)
- && publish.recipients == subscriptions)
- .exactly_one()
- .is_ok()
- );
-}
-
-#[tokio::test]
-async fn ping_multiple_subscriptions_with_failure() {
- let app = fixtures::scratch_app().await;
-
- let recipient = fixtures::identity::create(&app, &fixtures::now()).await;
- let vapid = fixtures::vapid::key(&app).await;
-
- // Create subscriptions
- let failing = SubscriptionInfo::new(
- "https://push.example.com/endpoint-failing",
- "testing-p256dh-key-failing",
- "testing-auth-failing",
- );
- let succeeding = SubscriptionInfo::new(
- "https://push.example.com/endpoint-succeeding",
- "testing-p256dh-key-succeeding",
- "testing-auth-succeeding",
- );
- let subscriptions = HashSet::from([failing.clone(), succeeding.clone()]);
- for subscription in &subscriptions {
- app.push()
- .subscribe(&recipient, subscription, &vapid)
- .await
- .expect("creating a subscription succeeds");
- }
-
- // Rig one of them to fail permanently
- app.publisher()
- .fail_next(&failing, WebPushError::InvalidUri);
-
- // Send a ping
- super::handler(
- State(app.push()),
- recipient.clone(),
- Json(super::Request {}),
- )
- .await
- .expect_err("sending a ping with a failing subscription fails");
-
- // Confirm that it was actually sent to both subs the first time
- let subscriptions = HashSet::from([failing.clone(), succeeding.clone()]);
- assert!(
- app.publisher()
- .sent()
- .iter()
- .filter(|publish| publish.message_eq(&Heartbeat::Heartbeat)
- && publish.recipients == subscriptions)
- .exactly_one()
- .is_ok()
- );
-
- // Send a second ping
- let response = super::handler(State(app.push()), recipient, Json(super::Request {}))
- .await
- .expect("sending a second ping succeeds after the failing subscription is removed");
- assert_eq!(StatusCode::ACCEPTED, response);
-
- // Confirm that it was only sent to the succeeding subscription
- let subscriptions = HashSet::from([succeeding]);
- let sent = app.publisher().sent();
- assert!(
- sent.iter()
- .filter(|publish| publish.message_eq(&Heartbeat::Heartbeat)
- && publish.recipients == subscriptions)
- .exactly_one()
- .is_ok()
- );
- assert!(
- !sent
- .iter()
- .any(|publish| publish.recipients.contains(&failing))
- );
-}
diff --git a/src/push/publisher.rs b/src/push/publisher.rs
index d6227a2..ef23f2f 100644
--- a/src/push/publisher.rs
+++ b/src/push/publisher.rs
@@ -1,34 +1,40 @@
use futures::future::join_all;
use itertools::Itertools as _;
use serde::Serialize;
+use sqlx::SqlitePool;
use web_push::{
ContentEncoding, IsahcWebPushClient, PartialVapidSignatureBuilder, SubscriptionInfo,
WebPushClient, WebPushError, WebPushMessage, WebPushMessageBuilder,
};
-use crate::error::failed::{Failed, ResultExt as _};
+use crate::{
+ db,
+ error::failed::{Failed, ResultExt as _},
+ push::repo::Provider,
+};
#[async_trait::async_trait]
pub trait Publish {
- async fn publish<'s, M>(
+ async fn publish<M>(
&self,
message: M,
signer: &PartialVapidSignatureBuilder,
- subscriptions: impl IntoIterator<Item = &'s SubscriptionInfo> + Send,
- ) -> Result<Vec<(&'s SubscriptionInfo, WebPushError)>, Failed>
+ subscriptions: impl IntoIterator<Item = &'_ SubscriptionInfo> + Send,
+ ) -> Result<(), Failed>
where
M: Serialize + Send + 'static;
}
#[derive(Clone)]
pub struct Publisher {
+ db: SqlitePool,
client: IsahcWebPushClient,
}
impl Publisher {
- pub fn new() -> Result<Self, WebPushError> {
+ pub fn new(db: SqlitePool) -> Result<Self, WebPushError> {
let client = IsahcWebPushClient::new()?;
- Ok(Self { client })
+ Ok(Self { db, client })
}
fn prepare_message(
@@ -49,16 +55,52 @@ impl Publisher {
Ok(message)
}
+
+ async fn settle_failed(
+ &self,
+ failures: Vec<(&SubscriptionInfo, WebPushError)>,
+ ) -> Result<(), Failed> {
+ if !failures.is_empty() {
+ let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?;
+ // Note that data integrity guarantees from whatever transaction originally read the
+ // subscriptions may no longer be valid now. Time has passed. Depending on how slow
+ // delivering push notifications is, potentially a _lot_ of time has passed.
+
+ for (sub, err) in &failures {
+ match err {
+ // I _think_ this is the complete set of permanent failures. See
+ // <https://docs.rs/web-push/latest/web_push/enum.WebPushError.html> for a complete
+ // list.
+ WebPushError::Unauthorized(_)
+ | WebPushError::InvalidUri
+ | WebPushError::EndpointNotValid(_)
+ | WebPushError::EndpointNotFound(_)
+ | WebPushError::InvalidCryptoKeys
+ | WebPushError::MissingCryptoKeys => {
+ tx.push()
+ .unsubscribe(sub)
+ .await
+ .fail("Failed to unsubscribe after permanent push message rejection")?;
+ }
+ _ => (),
+ }
+ }
+
+ tx.commit().await.fail(db::failed::COMMIT)?;
+ }
+
+ Ok(())
+ }
}
#[async_trait::async_trait]
impl Publish for Publisher {
- async fn publish<'s, M>(
+ async fn publish<M>(
&self,
message: M,
signer: &PartialVapidSignatureBuilder,
- subscriptions: impl IntoIterator<Item = &'s SubscriptionInfo> + Send,
- ) -> Result<Vec<(&'s SubscriptionInfo, WebPushError)>, Failed>
+ subscriptions: impl IntoIterator<Item = &'_ SubscriptionInfo> + Send,
+ ) -> Result<(), Failed>
where
M: Serialize + Send + 'static,
{
@@ -74,12 +116,14 @@ impl Publish for Publisher {
.into_iter()
.map(async |(sub, message)| (sub, self.client.send(message).await));
- let failures = join_all(deliveries)
+ let failures: Vec<_> = join_all(deliveries)
.await
.into_iter()
.filter_map(|(sub, result)| result.err().map(|err| (sub, err)))
.collect();
- Ok(failures)
+ self.settle_failed(failures).await?;
+
+ Ok(())
}
}
diff --git a/src/test/webpush.rs b/src/test/webpush.rs
index 55caf19..656f66a 100644
--- a/src/test/webpush.rs
+++ b/src/test/webpush.rs
@@ -1,11 +1,11 @@
use std::{
any::Any,
- collections::{HashMap, HashSet},
+ collections::HashSet,
mem,
sync::{Arc, Mutex, MutexGuard},
};
-use web_push::{PartialVapidSignatureBuilder, SubscriptionInfo, WebPushError};
+use web_push::{PartialVapidSignatureBuilder, SubscriptionInfo};
use crate::{error::failed::Failed, push::Publish};
@@ -15,7 +15,6 @@ pub struct Client(Arc<Mutex<ClientInner>>);
#[derive(Default)]
struct ClientInner {
sent: Vec<Publication>,
- planned_failures: HashMap<SubscriptionInfo, WebPushError>,
}
impl Client {
@@ -33,35 +32,23 @@ impl Client {
let sent = &mut self.inner().sent;
mem::take(sent)
}
-
- pub fn fail_next(&self, subscription_info: &SubscriptionInfo, err: WebPushError) {
- let planned_failures = &mut self.inner().planned_failures;
- planned_failures.insert(subscription_info.clone(), err);
- }
}
#[async_trait::async_trait]
impl Publish for Client {
- async fn publish<'s, M>(
+ async fn publish<M>(
&self,
message: M,
_: &PartialVapidSignatureBuilder,
- subscriptions: impl IntoIterator<Item = &'s SubscriptionInfo> + Send,
- ) -> Result<Vec<(&'s SubscriptionInfo, WebPushError)>, Failed>
+ subscriptions: impl IntoIterator<Item = &'_ SubscriptionInfo> + Send,
+ ) -> Result<(), Failed>
where
M: Send + 'static,
{
let mut inner = self.inner();
- let message: Box<dyn Any + Send> = Box::new(message);
- let mut recipients = HashSet::new();
- let mut failures = Vec::new();
- for subscription in subscriptions {
- recipients.insert(subscription.clone());
- if let Some(err) = inner.planned_failures.remove(subscription) {
- failures.push((subscription, err));
- }
- }
+ let message: Box<dyn Any + Send> = Box::new(message);
+ let recipients = subscriptions.into_iter().cloned().collect();
let publication = Publication {
message,
@@ -69,7 +56,7 @@ impl Publish for Client {
};
inner.sent.push(publication);
- Ok(failures)
+ Ok(())
}
}
diff --git a/ui/service-worker.js b/ui/service-worker.js
index 81d0752..8a889ec 100644
--- a/ui/service-worker.js
+++ b/ui/service-worker.js
@@ -27,6 +27,7 @@ function countUnreadChannels() {
self.addEventListener('push', (event) => {
const data = event.data.json();
+
if (data.type === 'heartbeat') {
// Let's show a notification right away so Safari doesn't tell Apple to be
// mad at us: