diff options
Diffstat (limited to 'src/push')
| -rw-r--r-- | src/push/app.rs | 68 | ||||
| -rw-r--r-- | src/push/handlers/ping/mod.rs | 9 | ||||
| -rw-r--r-- | src/push/handlers/ping/test.rs | 18 | ||||
| -rw-r--r-- | src/push/handlers/subscribe/test.rs | 95 | ||||
| -rw-r--r-- | src/push/mod.rs | 3 | ||||
| -rw-r--r-- | src/push/publisher.rs | 83 |
6 files changed, 138 insertions, 138 deletions
diff --git a/src/push/app.rs b/src/push/app.rs index 2bd6c25..ebfc220 100644 --- a/src/push/app.rs +++ b/src/push/app.rs @@ -1,29 +1,26 @@ -use futures::future::join_all; -use itertools::Itertools as _; use p256::ecdsa::VerifyingKey; use sqlx::SqlitePool; -use web_push::{ - ContentEncoding, PartialVapidSignatureBuilder, SubscriptionInfo, WebPushClient, WebPushError, - WebPushMessage, WebPushMessageBuilder, -}; +use web_push::{SubscriptionInfo, WebPushError}; use super::repo::Provider as _; use crate::{ db, error::failed::{ErrorExt as _, Failed, ResultExt as _}, + event::Heartbeat, login::Login, + push::publisher::Publish, token::extract::Identity, vapid::repo::Provider as _, }; pub struct Push<P> { db: SqlitePool, - webpush: P, + publisher: P, } impl<P> Push<P> { - pub const fn new(db: SqlitePool, webpush: P) -> Self { - Self { db, webpush } + pub const fn new(db: SqlitePool, publisher: P) -> Self { + Self { db, publisher } } pub async fn subscribe( @@ -79,28 +76,8 @@ impl<P> Push<P> { impl<P> Push<P> where - P: WebPushClient, + P: Publish, { - fn prepare_ping( - signer: &PartialVapidSignatureBuilder, - subscription: &SubscriptionInfo, - ) -> Result<WebPushMessage, PushError> { - let signature = signer - .clone() - .add_sub_info(subscription) - .build() - .fail("Failed to build VAPID signature")?; - - let payload = "ping".as_bytes(); - - let mut message = WebPushMessageBuilder::new(subscription); - message.set_payload(ContentEncoding::Aes128Gcm, payload); - message.set_vapid_signature(signature); - let message = message.build().fail("Failed to build push message")?; - - Ok(message) - } - pub async fn ping(&self, recipient: &Login) -> Result<(), PushError> { let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?; @@ -116,22 +93,24 @@ where ) })?; - let pings: Vec<_> = subscriptions - .into_iter() - .map(|sub| Self::prepare_ping(&signer, &sub).map(|message| (sub, message))) - .try_collect()?; - - let deliveries = pings - .into_iter() - .map(async |(sub, message)| (sub, self.webpush.send(message).await)); + // We're about to perform some fairly IO-intensive outside actions. Holding a tx open + // across them raises the risk that other clients will encounter errors due to locks from + // this transaction, so release it here. We'll open a new transaction if there's something + // we need to write. + tx.commit().await.fail(db::failed::COMMIT)?; - let failures: Vec<_> = join_all(deliveries) + let failures = self + .publisher + .publish(Heartbeat::Heartbeat, signer, subscriptions) .await - .into_iter() - .filter_map(|(sub, result)| result.err().map(|err| (sub, err))) - .collect(); + .fail("Failed to send push message")?; if !failures.is_empty() { + let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?; + // Note that data integrity guarantees from the original transaction to read + // subscriptions may no longer be valid now. Time has passed. Depending on how slow + // delivering push notifications is, potentially a _lot_ of time has passed. + for (sub, err) in &failures { match err { // I _think_ this is the complete set of permanent failures. See @@ -152,13 +131,12 @@ where } } + tx.commit().await.fail(db::failed::COMMIT)?; + return Err(PushError::Delivery( failures.into_iter().map(|(_, err)| err).collect(), )); } - - tx.commit().await.fail(db::failed::COMMIT)?; - Ok(()) } } diff --git a/src/push/handlers/ping/mod.rs b/src/push/handlers/ping/mod.rs index db828fa..2a86984 100644 --- a/src/push/handlers/ping/mod.rs +++ b/src/push/handlers/ping/mod.rs @@ -1,7 +1,10 @@ use axum::{Json, extract::State, http::StatusCode}; -use web_push::WebPushClient; -use crate::{error::Internal, push::app::Push, token::extract::Identity}; +use crate::{ + error::Internal, + push::{Publish, app::Push}, + token::extract::Identity, +}; #[cfg(test)] mod test; @@ -15,7 +18,7 @@ pub async fn handler<P>( Json(_): Json<Request>, ) -> Result<StatusCode, Internal> where - P: WebPushClient, + P: Publish, { push.ping(&identity.login).await?; diff --git a/src/push/handlers/ping/test.rs b/src/push/handlers/ping/test.rs index 5725131..3481139 100644 --- a/src/push/handlers/ping/test.rs +++ b/src/push/handlers/ping/test.rs @@ -2,8 +2,9 @@ use axum::{ extract::{Json, State}, http::StatusCode, }; +use itertools::Itertools; -use crate::test::fixtures; +use crate::{event::Heartbeat, test::fixtures}; #[tokio::test] async fn ping_without_subscriptions() { @@ -11,18 +12,21 @@ async fn ping_without_subscriptions() { let recipient = fixtures::identity::create(&app, &fixtures::now()).await; - app.vapid() - .refresh_key(&fixtures::now()) - .await - .expect("refreshing the VAPID key always succeeds"); - let response = super::handler(State(app.push()), recipient, Json(super::Request {})) .await .expect("sending a ping with no subscriptions always succeeds"); assert_eq!(StatusCode::ACCEPTED, response); - assert!(app.webpush().sent().is_empty()); + assert!( + app.publisher() + .sent() + .into_iter() + .filter(|publish| publish.message_eq(&Heartbeat::Heartbeat) + && publish.subscriptions.is_empty()) + .exactly_one() + .is_ok() + ); } // More complete testing requires that we figure out how to generate working p256 ECDH keys for diff --git a/src/push/handlers/subscribe/test.rs b/src/push/handlers/subscribe/test.rs index 1bc37a4..793bcef 100644 --- a/src/push/handlers/subscribe/test.rs +++ b/src/push/handlers/subscribe/test.rs @@ -3,33 +3,16 @@ use axum::{ http::StatusCode, }; -use crate::{ - push::app::SubscribeError, - test::{fixtures, fixtures::event}, -}; +use crate::{push::app::SubscribeError, test::fixtures}; #[tokio::test] async fn accepts_new_subscription() { let app = fixtures::scratch_app().await; let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - // Issue a VAPID key. - - app.vapid() - .refresh_key(&fixtures::now()) - .await - .expect("refreshing the VAPID key always succeeds"); - // Find out what that VAPID key is. - let boot = app.boot().snapshot().await.expect("boot always succeeds"); - let vapid = boot - .events - .into_iter() - .filter_map(event::vapid) - .filter_map(event::vapid::changed) - .next_back() - .expect("the application will have a vapid key after a refresh"); + let vapid = fixtures::vapid::key(&app).await; // Create a dummy subscription with that key. @@ -41,7 +24,7 @@ async fn accepts_new_subscription() { auth: String::from("test-auth-value"), }, }, - vapid: vapid.key, + vapid, }; let response = super::handler(State(app.push()), subscriber, Json(request)) .await @@ -57,23 +40,9 @@ async fn accepts_repeat_subscription() { let app = fixtures::scratch_app().await; let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - // Issue a VAPID key. - - app.vapid() - .refresh_key(&fixtures::now()) - .await - .expect("refreshing the VAPID key always succeeds"); - // Find out what that VAPID key is. - let boot = app.boot().snapshot().await.expect("boot always succeeds"); - let vapid = boot - .events - .into_iter() - .filter_map(event::vapid) - .filter_map(event::vapid::changed) - .next_back() - .expect("the application will have a vapid key after a refresh"); + let vapid = fixtures::vapid::key(&app).await; // Create a dummy subscription with that key. @@ -85,7 +54,7 @@ async fn accepts_repeat_subscription() { auth: String::from("test-auth-value"), }, }, - vapid: vapid.key, + vapid, }; let response = super::handler(State(app.push()), subscriber.clone(), Json(request.clone())) .await @@ -111,23 +80,9 @@ async fn rejects_duplicate_subscription() { let app = fixtures::scratch_app().await; let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - // Issue a VAPID key. - - app.vapid() - .refresh_key(&fixtures::now()) - .await - .expect("refreshing the VAPID key always succeeds"); - // Find out what that VAPID key is. - let boot = app.boot().snapshot().await.expect("boot always succeeds"); - let vapid = boot - .events - .into_iter() - .filter_map(event::vapid) - .filter_map(event::vapid::changed) - .next_back() - .expect("the application will have a vapid key after a refresh"); + let vapid = fixtures::vapid::key(&app).await; // Create a dummy subscription with that key. @@ -139,7 +94,7 @@ async fn rejects_duplicate_subscription() { auth: String::from("test-auth-value"), }, }, - vapid: vapid.key, + vapid, }; super::handler(State(app.push()), subscriber.clone(), Json(request)) .await @@ -155,7 +110,7 @@ async fn rejects_duplicate_subscription() { auth: String::from("different-test-auth-value"), }, }, - vapid: vapid.key, + vapid, }; let response = super::handler(State(app.push()), subscriber, Json(request)) .await @@ -170,24 +125,7 @@ async fn rejects_duplicate_subscription() { async fn rejects_stale_vapid_key() { let app = fixtures::scratch_app().await; let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - - // Issue a VAPID key. - - app.vapid() - .refresh_key(&fixtures::now()) - .await - .expect("refreshing the VAPID key always succeeds"); - - // Find out what that VAPID key is. - - let boot = app.boot().snapshot().await.expect("boot always succeeds"); - let vapid = boot - .events - .into_iter() - .filter_map(event::vapid) - .filter_map(event::vapid::changed) - .next_back() - .expect("the application will have a vapid key after a refresh"); + let stale_vapid = fixtures::vapid::key(&app).await; // Change the VAPID key. @@ -200,16 +138,7 @@ async fn rejects_stale_vapid_key() { .await .expect("refreshing the VAPID key always succeeds"); - // Find out what the new VAPID key is. - - let boot = app.boot().snapshot().await.expect("boot always succeeds"); - let fresh_vapid = boot - .events - .into_iter() - .filter_map(event::vapid) - .filter_map(event::vapid::changed) - .next_back() - .expect("the application will have a vapid key after a refresh"); + let fresh_vapid = fixtures::vapid::key(&app).await; // Create a dummy subscription with the original key. @@ -221,7 +150,7 @@ async fn rejects_stale_vapid_key() { auth: String::from("test-auth-value"), }, }, - vapid: vapid.key, + vapid: stale_vapid, }; let response = super::handler(State(app.push()), subscriber, Json(request)) .await @@ -231,6 +160,6 @@ async fn rejects_stale_vapid_key() { assert!(matches!( response, - super::Error(SubscribeError::StaleVapidKey(key)) if key == fresh_vapid.key + super::Error(SubscribeError::StaleVapidKey(key)) if key == fresh_vapid )); } diff --git a/src/push/mod.rs b/src/push/mod.rs index 1394ea4..042991f 100644 --- a/src/push/mod.rs +++ b/src/push/mod.rs @@ -1,3 +1,6 @@ pub mod app; pub mod handlers; +mod publisher; pub mod repo; + +pub use publisher::{Publish, Publisher}; diff --git a/src/push/publisher.rs b/src/push/publisher.rs new file mode 100644 index 0000000..4092724 --- /dev/null +++ b/src/push/publisher.rs @@ -0,0 +1,83 @@ +use futures::future::join_all; +use itertools::Itertools as _; +use serde::Serialize; +use web_push::{ + ContentEncoding, IsahcWebPushClient, PartialVapidSignatureBuilder, SubscriptionInfo, + WebPushClient, WebPushError, WebPushMessage, WebPushMessageBuilder, +}; + +use crate::error::failed::{Failed, ResultExt as _}; + +pub trait Publish { + fn publish<M>( + &self, + message: M, + signer: PartialVapidSignatureBuilder, + subscriptions: impl IntoIterator<Item = SubscriptionInfo> + Send, + ) -> impl Future<Output = Result<Vec<(SubscriptionInfo, WebPushError)>, Failed>> + Send + where + M: Serialize + Send + 'static; +} + +#[derive(Clone)] +pub struct Publisher { + client: IsahcWebPushClient, +} + +impl Publisher { + pub fn new() -> Result<Self, WebPushError> { + let client = IsahcWebPushClient::new()?; + Ok(Self { client }) + } + + fn prepare_message( + payload: &[u8], + signer: &PartialVapidSignatureBuilder, + subscription: &SubscriptionInfo, + ) -> Result<WebPushMessage, Failed> { + let signature = signer + .clone() + .add_sub_info(subscription) + .build() + .fail("Failed to build VAPID signature")?; + + let mut message = WebPushMessageBuilder::new(subscription); + message.set_payload(ContentEncoding::Aes128Gcm, payload); + message.set_vapid_signature(signature); + let message = message.build().fail("Failed to build push message")?; + + Ok(message) + } +} + +impl Publish for Publisher { + async fn publish<M>( + &self, + message: M, + signer: PartialVapidSignatureBuilder, + subscriptions: impl IntoIterator<Item = SubscriptionInfo> + Send, + ) -> Result<Vec<(SubscriptionInfo, WebPushError)>, Failed> + where + M: Serialize + Send + 'static, + { + let payload = serde_json::to_vec_pretty(&message) + .fail("Failed to encode web push message to JSON")?; + + let messages: Vec<_> = subscriptions + .into_iter() + .map(|sub| Self::prepare_message(&payload, &signer, &sub).map(|message| (sub, message))) + .try_collect()?; + + let deliveries = messages + .into_iter() + .map(async |(sub, message)| (sub, self.client.send(message).await)); + + let failures = join_all(deliveries) + .await + .into_iter() + .filter_map(|(sub, result)| result.err().map(|err| (sub, err))) + .collect(); + + Ok(failures) + } +} |
