diff options
| -rw-r--r-- | src/cli.rs | 2 | ||||
| -rw-r--r-- | src/message/app.rs | 32 | ||||
| -rw-r--r-- | src/push/app.rs | 48 | ||||
| -rw-r--r-- | src/push/handlers/ping/test.rs | 200 | ||||
| -rw-r--r-- | src/push/publisher.rs | 66 | ||||
| -rw-r--r-- | src/test/webpush.rs | 29 | ||||
| -rw-r--r-- | ui/service-worker.js | 1 |
7 files changed, 69 insertions, 309 deletions
@@ -101,7 +101,7 @@ impl Args { self.umask.set(); let pool = self.pool().await.fail("Failed to create database pool")?; - let publisher = Publisher::new().fail("Failed to create web push publisher")?; + let publisher = Publisher::new(pool.clone()).fail("Failed to create web push publisher")?; let app = App::from(pool, publisher); match self.command { diff --git a/src/message/app.rs b/src/message/app.rs index b79dad0..4bf96bf 100644 --- a/src/message/app.rs +++ b/src/message/app.rs @@ -1,7 +1,6 @@ use chrono::TimeDelta; use itertools::Itertools; use sqlx::sqlite::SqlitePool; -use web_push::WebPushError; use super::{Body, History, Id, Message, history, repo::Provider as _}; use crate::{ @@ -184,39 +183,10 @@ where self.events.broadcast_from(events.clone()); for event in events { - let failures = self - .publisher + self.publisher .publish(Event::from(event), &signer, &push_recipients) .await .fail("Failed to publish push events")?; - - if !failures.is_empty() { - let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?; - // Note that data integrity guarantees from the original transaction to read - // subscriptions may no longer be valid now. Time has passed. Depending on how slow - // delivering push notifications is, potentially a _lot_ of time has passed. - - for (sub, err) in failures { - match err { - // I _think_ this is the complete set of permanent failures. See - // <https://docs.rs/web-push/latest/web_push/enum.WebPushError.html> for a complete - // list. - WebPushError::Unauthorized(_) - | WebPushError::InvalidUri - | WebPushError::EndpointNotValid(_) - | WebPushError::EndpointNotFound(_) - | WebPushError::InvalidCryptoKeys - | WebPushError::MissingCryptoKeys => { - tx.push().unsubscribe(sub).await.fail( - "Failed to unsubscribe after permanent push message rejection", - )?; - } - _ => (), - } - } - - tx.commit().await.fail(db::failed::COMMIT)?; - } } Ok(message.as_sent()) diff --git a/src/push/app.rs b/src/push/app.rs index f7846a6..1983055 100644 --- a/src/push/app.rs +++ b/src/push/app.rs @@ -1,6 +1,6 @@ use p256::ecdsa::VerifyingKey; use sqlx::SqlitePool; -use web_push::{SubscriptionInfo, WebPushError}; +use web_push::SubscriptionInfo; use super::repo::Provider as _; use crate::{ @@ -78,7 +78,7 @@ impl<P> Push<P> where P: Publish, { - pub async fn ping(&self, recipient: &Login) -> Result<(), PushError> { + pub async fn ping(&self, recipient: &Login) -> Result<(), Failed> { let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?; let signer = tx @@ -99,44 +99,10 @@ where // we need to write. tx.commit().await.fail(db::failed::COMMIT)?; - let failures = self - .publisher + self.publisher .publish(Heartbeat::Heartbeat, &signer, &subscriptions) .await .fail("Failed to send push message")?; - - if !failures.is_empty() { - let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?; - // Note that data integrity guarantees from the original transaction to read - // subscriptions may no longer be valid now. Time has passed. Depending on how slow - // delivering push notifications is, potentially a _lot_ of time has passed. - - for (sub, err) in &failures { - match err { - // I _think_ this is the complete set of permanent failures. See - // <https://docs.rs/web-push/latest/web_push/enum.WebPushError.html> for a complete - // list. - WebPushError::Unauthorized(_) - | WebPushError::InvalidUri - | WebPushError::EndpointNotValid(_) - | WebPushError::EndpointNotFound(_) - | WebPushError::InvalidCryptoKeys - | WebPushError::MissingCryptoKeys => { - tx.push() - .unsubscribe(sub) - .await - .fail("Failed to unsubscribe after permanent push message rejection")?; - } - _ => (), - } - } - - tx.commit().await.fail(db::failed::COMMIT)?; - - return Err(PushError::Delivery( - failures.into_iter().map(|(_, err)| err).collect(), - )); - } Ok(()) } } @@ -153,11 +119,3 @@ pub enum SubscribeError { #[error(transparent)] Failed(#[from] Failed), } - -#[derive(Debug, thiserror::Error)] -pub enum PushError { - #[error("push message delivery failures: {0:?}")] - Delivery(Vec<WebPushError>), - #[error(transparent)] - Failed(#[from] Failed), -} diff --git a/src/push/handlers/ping/test.rs b/src/push/handlers/ping/test.rs index cc07ef0..70ba4bf 100644 --- a/src/push/handlers/ping/test.rs +++ b/src/push/handlers/ping/test.rs @@ -172,203 +172,3 @@ async fn ping_recipient_only() { .any(|publish| publish.recipients.contains(&spectator_subscription)) ); } - -#[tokio::test] -async fn ping_permanent_error() { - let app = fixtures::scratch_app().await; - - let recipient = fixtures::identity::create(&app, &fixtures::now()).await; - - let vapid = fixtures::vapid::key(&app).await; - - // Create subscriptions - let subscription = SubscriptionInfo::new( - "https://push.example.com/recipient/endpoint", - "recipient-p256dh-key", - "recipient-auth", - ); - app.push() - .subscribe(&recipient, &subscription, &vapid) - .await - .expect("creating a subscription succeeds"); - - // Prepare the next ping attempt to fail - app.publisher() - .fail_next(&subscription, WebPushError::InvalidUri); - - // Send a ping - super::handler( - State(app.push()), - recipient.clone(), - Json(super::Request {}), - ) - .await - .expect_err("sending a ping with a permanently-failing subscription fails"); - - // Confirm that it was actually sent - let sent = app.publisher().sent(); - - let subscriptions = HashSet::from([subscription.clone()]); - assert!( - sent.iter() - .filter(|publish| publish.message_eq(&Heartbeat::Heartbeat) - && publish.recipients == subscriptions) - .exactly_one() - .is_ok() - ); - - // Send a second ping - let response = super::handler(State(app.push()), recipient, Json(super::Request {})) - .await - .expect("sending a ping with no subscriptions always succeeds"); - - assert_eq!(StatusCode::ACCEPTED, response); - - // Confirm that it was _not_ sent this time, since it failed permanently last time - let sent = app.publisher().sent(); - - assert!( - !sent - .iter() - .any(|publish| publish.recipients.contains(&subscription)) - ); -} - -#[tokio::test] -async fn ping_temporary_error() { - let app = fixtures::scratch_app().await; - - let recipient = fixtures::identity::create(&app, &fixtures::now()).await; - - let vapid = fixtures::vapid::key(&app).await; - - // Create subscriptions - let subscription = SubscriptionInfo::new( - "https://push.example.com/recipient/endpoint", - "recipient-p256dh-key", - "recipient-auth", - ); - app.push() - .subscribe(&recipient, &subscription, &vapid) - .await - .expect("creating a subscription succeeds"); - - // Prepare the next ping attempt to fail - app.publisher().fail_next( - &subscription, - WebPushError::from(io::Error::other("transient IO error")), - ); - - // Send a ping - super::handler( - State(app.push()), - recipient.clone(), - Json(super::Request {}), - ) - .await - .expect_err("sending a ping with a temporarily-failing subscription fails"); - - // Confirm that it was actually sent - let sent = app.publisher().sent(); - - let subscriptions = HashSet::from([subscription.clone()]); - assert!( - sent.iter() - .filter(|publish| publish.message_eq(&Heartbeat::Heartbeat) - && publish.recipients == subscriptions) - .exactly_one() - .is_ok() - ); - - // Send a second ping - let response = super::handler(State(app.push()), recipient, Json(super::Request {})) - .await - .expect("sending a ping subscription succeeds now that the transient error has cleared"); - assert_eq!(StatusCode::ACCEPTED, response); - - // Confirm that it was _not_ sent this time, since it failed permanently last time - let sent = app.publisher().sent(); - - let subscriptions = HashSet::from([subscription]); - assert!( - sent.iter() - .filter(|publish| publish.message_eq(&Heartbeat::Heartbeat) - && publish.recipients == subscriptions) - .exactly_one() - .is_ok() - ); -} - -#[tokio::test] -async fn ping_multiple_subscriptions_with_failure() { - let app = fixtures::scratch_app().await; - - let recipient = fixtures::identity::create(&app, &fixtures::now()).await; - let vapid = fixtures::vapid::key(&app).await; - - // Create subscriptions - let failing = SubscriptionInfo::new( - "https://push.example.com/endpoint-failing", - "testing-p256dh-key-failing", - "testing-auth-failing", - ); - let succeeding = SubscriptionInfo::new( - "https://push.example.com/endpoint-succeeding", - "testing-p256dh-key-succeeding", - "testing-auth-succeeding", - ); - let subscriptions = HashSet::from([failing.clone(), succeeding.clone()]); - for subscription in &subscriptions { - app.push() - .subscribe(&recipient, subscription, &vapid) - .await - .expect("creating a subscription succeeds"); - } - - // Rig one of them to fail permanently - app.publisher() - .fail_next(&failing, WebPushError::InvalidUri); - - // Send a ping - super::handler( - State(app.push()), - recipient.clone(), - Json(super::Request {}), - ) - .await - .expect_err("sending a ping with a failing subscription fails"); - - // Confirm that it was actually sent to both subs the first time - let subscriptions = HashSet::from([failing.clone(), succeeding.clone()]); - assert!( - app.publisher() - .sent() - .iter() - .filter(|publish| publish.message_eq(&Heartbeat::Heartbeat) - && publish.recipients == subscriptions) - .exactly_one() - .is_ok() - ); - - // Send a second ping - let response = super::handler(State(app.push()), recipient, Json(super::Request {})) - .await - .expect("sending a second ping succeeds after the failing subscription is removed"); - assert_eq!(StatusCode::ACCEPTED, response); - - // Confirm that it was only sent to the succeeding subscription - let subscriptions = HashSet::from([succeeding]); - let sent = app.publisher().sent(); - assert!( - sent.iter() - .filter(|publish| publish.message_eq(&Heartbeat::Heartbeat) - && publish.recipients == subscriptions) - .exactly_one() - .is_ok() - ); - assert!( - !sent - .iter() - .any(|publish| publish.recipients.contains(&failing)) - ); -} diff --git a/src/push/publisher.rs b/src/push/publisher.rs index d6227a2..ef23f2f 100644 --- a/src/push/publisher.rs +++ b/src/push/publisher.rs @@ -1,34 +1,40 @@ use futures::future::join_all; use itertools::Itertools as _; use serde::Serialize; +use sqlx::SqlitePool; use web_push::{ ContentEncoding, IsahcWebPushClient, PartialVapidSignatureBuilder, SubscriptionInfo, WebPushClient, WebPushError, WebPushMessage, WebPushMessageBuilder, }; -use crate::error::failed::{Failed, ResultExt as _}; +use crate::{ + db, + error::failed::{Failed, ResultExt as _}, + push::repo::Provider, +}; #[async_trait::async_trait] pub trait Publish { - async fn publish<'s, M>( + async fn publish<M>( &self, message: M, signer: &PartialVapidSignatureBuilder, - subscriptions: impl IntoIterator<Item = &'s SubscriptionInfo> + Send, - ) -> Result<Vec<(&'s SubscriptionInfo, WebPushError)>, Failed> + subscriptions: impl IntoIterator<Item = &'_ SubscriptionInfo> + Send, + ) -> Result<(), Failed> where M: Serialize + Send + 'static; } #[derive(Clone)] pub struct Publisher { + db: SqlitePool, client: IsahcWebPushClient, } impl Publisher { - pub fn new() -> Result<Self, WebPushError> { + pub fn new(db: SqlitePool) -> Result<Self, WebPushError> { let client = IsahcWebPushClient::new()?; - Ok(Self { client }) + Ok(Self { db, client }) } fn prepare_message( @@ -49,16 +55,52 @@ impl Publisher { Ok(message) } + + async fn settle_failed( + &self, + failures: Vec<(&SubscriptionInfo, WebPushError)>, + ) -> Result<(), Failed> { + if !failures.is_empty() { + let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?; + // Note that data integrity guarantees from whatever transaction originally read the + // subscriptions may no longer be valid now. Time has passed. Depending on how slow + // delivering push notifications is, potentially a _lot_ of time has passed. + + for (sub, err) in &failures { + match err { + // I _think_ this is the complete set of permanent failures. See + // <https://docs.rs/web-push/latest/web_push/enum.WebPushError.html> for a complete + // list. + WebPushError::Unauthorized(_) + | WebPushError::InvalidUri + | WebPushError::EndpointNotValid(_) + | WebPushError::EndpointNotFound(_) + | WebPushError::InvalidCryptoKeys + | WebPushError::MissingCryptoKeys => { + tx.push() + .unsubscribe(sub) + .await + .fail("Failed to unsubscribe after permanent push message rejection")?; + } + _ => (), + } + } + + tx.commit().await.fail(db::failed::COMMIT)?; + } + + Ok(()) + } } #[async_trait::async_trait] impl Publish for Publisher { - async fn publish<'s, M>( + async fn publish<M>( &self, message: M, signer: &PartialVapidSignatureBuilder, - subscriptions: impl IntoIterator<Item = &'s SubscriptionInfo> + Send, - ) -> Result<Vec<(&'s SubscriptionInfo, WebPushError)>, Failed> + subscriptions: impl IntoIterator<Item = &'_ SubscriptionInfo> + Send, + ) -> Result<(), Failed> where M: Serialize + Send + 'static, { @@ -74,12 +116,14 @@ impl Publish for Publisher { .into_iter() .map(async |(sub, message)| (sub, self.client.send(message).await)); - let failures = join_all(deliveries) + let failures: Vec<_> = join_all(deliveries) .await .into_iter() .filter_map(|(sub, result)| result.err().map(|err| (sub, err))) .collect(); - Ok(failures) + self.settle_failed(failures).await?; + + Ok(()) } } diff --git a/src/test/webpush.rs b/src/test/webpush.rs index 55caf19..656f66a 100644 --- a/src/test/webpush.rs +++ b/src/test/webpush.rs @@ -1,11 +1,11 @@ use std::{ any::Any, - collections::{HashMap, HashSet}, + collections::HashSet, mem, sync::{Arc, Mutex, MutexGuard}, }; -use web_push::{PartialVapidSignatureBuilder, SubscriptionInfo, WebPushError}; +use web_push::{PartialVapidSignatureBuilder, SubscriptionInfo}; use crate::{error::failed::Failed, push::Publish}; @@ -15,7 +15,6 @@ pub struct Client(Arc<Mutex<ClientInner>>); #[derive(Default)] struct ClientInner { sent: Vec<Publication>, - planned_failures: HashMap<SubscriptionInfo, WebPushError>, } impl Client { @@ -33,35 +32,23 @@ impl Client { let sent = &mut self.inner().sent; mem::take(sent) } - - pub fn fail_next(&self, subscription_info: &SubscriptionInfo, err: WebPushError) { - let planned_failures = &mut self.inner().planned_failures; - planned_failures.insert(subscription_info.clone(), err); - } } #[async_trait::async_trait] impl Publish for Client { - async fn publish<'s, M>( + async fn publish<M>( &self, message: M, _: &PartialVapidSignatureBuilder, - subscriptions: impl IntoIterator<Item = &'s SubscriptionInfo> + Send, - ) -> Result<Vec<(&'s SubscriptionInfo, WebPushError)>, Failed> + subscriptions: impl IntoIterator<Item = &'_ SubscriptionInfo> + Send, + ) -> Result<(), Failed> where M: Send + 'static, { let mut inner = self.inner(); - let message: Box<dyn Any + Send> = Box::new(message); - let mut recipients = HashSet::new(); - let mut failures = Vec::new(); - for subscription in subscriptions { - recipients.insert(subscription.clone()); - if let Some(err) = inner.planned_failures.remove(subscription) { - failures.push((subscription, err)); - } - } + let message: Box<dyn Any + Send> = Box::new(message); + let recipients = subscriptions.into_iter().cloned().collect(); let publication = Publication { message, @@ -69,7 +56,7 @@ impl Publish for Client { }; inner.sent.push(publication); - Ok(failures) + Ok(()) } } diff --git a/ui/service-worker.js b/ui/service-worker.js index 81d0752..8a889ec 100644 --- a/ui/service-worker.js +++ b/ui/service-worker.js @@ -27,6 +27,7 @@ function countUnreadChannels() { self.addEventListener('push', (event) => { const data = event.data.json(); + if (data.type === 'heartbeat') { // Let's show a notification right away so Safari doesn't tell Apple to be // mad at us: |
