diff options
Diffstat (limited to 'src/push')
| -rw-r--r-- | src/push/app.rs | 178 | ||||
| -rw-r--r-- | src/push/handlers/mod.rs | 5 | ||||
| -rw-r--r-- | src/push/handlers/ping/mod.rs | 23 | ||||
| -rw-r--r-- | src/push/handlers/ping/test.rs | 40 | ||||
| -rw-r--r-- | src/push/handlers/subscribe/mod.rs | 94 | ||||
| -rw-r--r-- | src/push/handlers/subscribe/test.rs | 236 | ||||
| -rw-r--r-- | src/push/mod.rs | 3 | ||||
| -rw-r--r-- | src/push/repo.rs | 149 |
8 files changed, 728 insertions, 0 deletions
diff --git a/src/push/app.rs b/src/push/app.rs new file mode 100644 index 0000000..56b9a02 --- /dev/null +++ b/src/push/app.rs @@ -0,0 +1,178 @@ +use futures::future::join_all; +use itertools::Itertools as _; +use p256::ecdsa::VerifyingKey; +use sqlx::SqlitePool; +use web_push::{ + ContentEncoding, PartialVapidSignatureBuilder, SubscriptionInfo, WebPushClient, WebPushError, + WebPushMessage, WebPushMessageBuilder, +}; + +use super::repo::Provider as _; +use crate::{login::Login, token::extract::Identity, vapid, vapid::repo::Provider as _}; + +pub struct Push<P> { + db: SqlitePool, + webpush: P, +} + +impl<P> Push<P> { + pub const fn new(db: SqlitePool, webpush: P) -> Self { + Self { db, webpush } + } + + pub async fn subscribe( + &self, + subscriber: &Identity, + subscription: &SubscriptionInfo, + vapid: &VerifyingKey, + ) -> Result<(), SubscribeError> { + let mut tx = self.db.begin().await?; + + let current = tx.vapid().current().await?; + if vapid != ¤t.key { + return Err(SubscribeError::StaleVapidKey(current.key)); + } + + match tx.push().create(&subscriber.token, subscription).await { + Ok(()) => (), + Err(err) => { + if let Some(err) = err.as_database_error() + && err.is_unique_violation() + { + let current = tx + .push() + .by_endpoint(&subscriber.login, &subscription.endpoint) + .await?; + // If we already have a subscription for this endpoint, with _different_ + // parameters, then this is a client error. They shouldn't reuse endpoint URLs, + // per the various RFCs. + // + // However, if we have a subscription for this endpoint with the same parameters + // then we accept it and silently do nothing. This may happen if, for example, + // the subscribe request is retried due to a network interruption where it's + // not clear whether the original request succeeded. + if ¤t != subscription { + return Err(SubscribeError::Duplicate); + } + } else { + return Err(SubscribeError::Database(err)); + } + } + } + + tx.commit().await?; + + Ok(()) + } +} + +impl<P> Push<P> +where + P: WebPushClient, +{ + fn prepare_ping( + signer: &PartialVapidSignatureBuilder, + subscription: &SubscriptionInfo, + ) -> Result<WebPushMessage, WebPushError> { + let signature = signer.clone().add_sub_info(subscription).build()?; + + let payload = "ping".as_bytes(); + + let mut message = WebPushMessageBuilder::new(subscription); + message.set_payload(ContentEncoding::Aes128Gcm, payload); + message.set_vapid_signature(signature); + let message = message.build()?; + + Ok(message) + } + + pub async fn ping(&self, recipient: &Login) -> Result<(), PushError> { + let mut tx = self.db.begin().await?; + + let signer = tx.vapid().signer().await?; + let subscriptions = tx.push().by_login(recipient).await?; + + let pings: Vec<_> = subscriptions + .into_iter() + .map(|sub| Self::prepare_ping(&signer, &sub).map(|message| (sub, message))) + .try_collect()?; + + let deliveries = pings + .into_iter() + .map(async |(sub, message)| (sub, self.webpush.send(message).await)); + + let failures: Vec<_> = join_all(deliveries) + .await + .into_iter() + .filter_map(|(sub, result)| result.err().map(|err| (sub, err))) + .collect(); + + if !failures.is_empty() { + for (sub, err) in &failures { + match err { + // I _think_ this is the complete set of permanent failures. See + // <https://docs.rs/web-push/latest/web_push/enum.WebPushError.html> for a complete + // list. + WebPushError::Unauthorized(_) + | WebPushError::InvalidUri + | WebPushError::EndpointNotValid(_) + | WebPushError::EndpointNotFound(_) + | WebPushError::InvalidCryptoKeys + | WebPushError::MissingCryptoKeys => { + tx.push().unsubscribe(sub).await?; + } + _ => (), + } + } + + return Err(PushError::Delivery( + failures.into_iter().map(|(_, err)| err).collect(), + )); + } + + tx.commit().await?; + + Ok(()) + } +} + +#[derive(Debug, thiserror::Error)] +pub enum SubscribeError { + #[error(transparent)] + Database(#[from] sqlx::Error), + #[error(transparent)] + Vapid(#[from] vapid::repo::Error), + #[error("subscription created with stale VAPID key")] + StaleVapidKey(VerifyingKey), + #[error("subscription already exists for endpoint")] + // The endpoint URL is not included in the error, as it is a bearer credential in its own right + // and we want to limit its proliferation. The only intended recipient of this message is the + // client, which already knows the endpoint anyways and doesn't need us to tell them. + Duplicate, +} + +#[derive(Debug, thiserror::Error)] +pub enum PushError { + #[error(transparent)] + Database(#[from] sqlx::Error), + #[error(transparent)] + Ecdsa(#[from] p256::ecdsa::Error), + #[error(transparent)] + Pkcs8(#[from] p256::pkcs8::Error), + #[error(transparent)] + WebPush(#[from] WebPushError), + #[error("push message delivery failures: {0:?}")] + Delivery(Vec<WebPushError>), +} + +impl From<vapid::repo::Error> for PushError { + fn from(error: vapid::repo::Error) -> Self { + use vapid::repo::Error; + match error { + Error::Database(error) => error.into(), + Error::Ecdsa(error) => error.into(), + Error::Pkcs8(error) => error.into(), + Error::WebPush(error) => error.into(), + } + } +} diff --git a/src/push/handlers/mod.rs b/src/push/handlers/mod.rs new file mode 100644 index 0000000..bb58774 --- /dev/null +++ b/src/push/handlers/mod.rs @@ -0,0 +1,5 @@ +mod ping; +mod subscribe; + +pub use ping::handler as ping; +pub use subscribe::handler as subscribe; diff --git a/src/push/handlers/ping/mod.rs b/src/push/handlers/ping/mod.rs new file mode 100644 index 0000000..db828fa --- /dev/null +++ b/src/push/handlers/ping/mod.rs @@ -0,0 +1,23 @@ +use axum::{Json, extract::State, http::StatusCode}; +use web_push::WebPushClient; + +use crate::{error::Internal, push::app::Push, token::extract::Identity}; + +#[cfg(test)] +mod test; + +#[derive(serde::Deserialize)] +pub struct Request {} + +pub async fn handler<P>( + State(push): State<Push<P>>, + identity: Identity, + Json(_): Json<Request>, +) -> Result<StatusCode, Internal> +where + P: WebPushClient, +{ + push.ping(&identity.login).await?; + + Ok(StatusCode::ACCEPTED) +} diff --git a/src/push/handlers/ping/test.rs b/src/push/handlers/ping/test.rs new file mode 100644 index 0000000..5725131 --- /dev/null +++ b/src/push/handlers/ping/test.rs @@ -0,0 +1,40 @@ +use axum::{ + extract::{Json, State}, + http::StatusCode, +}; + +use crate::test::fixtures; + +#[tokio::test] +async fn ping_without_subscriptions() { + let app = fixtures::scratch_app().await; + + let recipient = fixtures::identity::create(&app, &fixtures::now()).await; + + app.vapid() + .refresh_key(&fixtures::now()) + .await + .expect("refreshing the VAPID key always succeeds"); + + let response = super::handler(State(app.push()), recipient, Json(super::Request {})) + .await + .expect("sending a ping with no subscriptions always succeeds"); + + assert_eq!(StatusCode::ACCEPTED, response); + + assert!(app.webpush().sent().is_empty()); +} + +// More complete testing requires that we figure out how to generate working p256 ECDH keys for +// testing _with_, as `web_push` will actually parse and use those keys even if push messages are +// ultimately never serialized or sent over HTTP. +// +// Tests that are missing: +// +// * Verify that subscribing and sending a ping causes a ping to be delivered to that subscription. +// * Verify that two subscriptions both get pings. +// * Verify that other users' subscriptions are not pinged. +// * Verify that a ping that causes a permanent error causes the subscription to be deleted. +// * Verify that a ping that causes a non-permanent error does not cause the subscription to be +// deleted. +// * Verify that a failure on one subscription doesn't affect delivery on other subscriptions. diff --git a/src/push/handlers/subscribe/mod.rs b/src/push/handlers/subscribe/mod.rs new file mode 100644 index 0000000..a1a5899 --- /dev/null +++ b/src/push/handlers/subscribe/mod.rs @@ -0,0 +1,94 @@ +use axum::{ + extract::{Json, State}, + http::StatusCode, + response::{IntoResponse, Response}, +}; +use p256::ecdsa::VerifyingKey; +use web_push::SubscriptionInfo; + +use crate::{ + error::Internal, + push::{app, app::Push}, + token::extract::Identity, +}; + +#[cfg(test)] +mod test; + +#[derive(Clone, serde::Deserialize)] +pub struct Request { + subscription: Subscription, + #[serde(with = "crate::vapid::ser::key")] + vapid: VerifyingKey, +} + +// This structure is described in <https://w3c.github.io/push-api/#dom-pushsubscription-tojson>. +#[derive(Clone, serde::Deserialize)] +pub struct Subscription { + endpoint: String, + keys: Keys, +} + +// This structure is described in <https://w3c.github.io/push-api/#dom-pushsubscription-tojson>. +#[derive(Clone, serde::Deserialize)] +pub struct Keys { + p256dh: String, + auth: String, +} + +pub async fn handler<P>( + State(push): State<Push<P>>, + identity: Identity, + Json(request): Json<Request>, +) -> Result<StatusCode, Error> { + let Request { + subscription, + vapid, + } = request; + + push.subscribe(&identity, &subscription.into(), &vapid) + .await?; + + Ok(StatusCode::CREATED) +} + +impl From<Subscription> for SubscriptionInfo { + fn from(request: Subscription) -> Self { + let Subscription { + endpoint, + keys: Keys { p256dh, auth }, + } = request; + SubscriptionInfo::new(endpoint, p256dh, auth) + } +} + +#[derive(Debug, thiserror::Error)] +#[error(transparent)] +pub struct Error(#[from] app::SubscribeError); + +impl IntoResponse for Error { + fn into_response(self) -> Response { + let Self(err) = self; + + match err { + app::SubscribeError::StaleVapidKey(key) => { + let body = StaleVapidKey { + message: err.to_string(), + key, + }; + (StatusCode::BAD_REQUEST, Json(body)).into_response() + } + app::SubscribeError::Duplicate => { + (StatusCode::CONFLICT, err.to_string()).into_response() + } + other => Internal::from(other).into_response(), + } + } +} + +#[derive(serde::Serialize)] +struct StaleVapidKey { + message: String, + #[serde(with = "crate::vapid::ser::key")] + key: VerifyingKey, +} diff --git a/src/push/handlers/subscribe/test.rs b/src/push/handlers/subscribe/test.rs new file mode 100644 index 0000000..b72624d --- /dev/null +++ b/src/push/handlers/subscribe/test.rs @@ -0,0 +1,236 @@ +use axum::{ + extract::{Json, State}, + http::StatusCode, +}; + +use crate::{ + push::app::SubscribeError, + test::{fixtures, fixtures::event}, +}; + +#[tokio::test] +async fn accepts_new_subscription() { + let app = fixtures::scratch_app().await; + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + + // Issue a VAPID key. + + app.vapid() + .refresh_key(&fixtures::now()) + .await + .expect("refreshing the VAPID key always succeeds"); + + // Find out what that VAPID key is. + + let boot = app.boot().snapshot().await.expect("boot always succeeds"); + let vapid = boot + .events + .into_iter() + .filter_map(event::vapid) + .filter_map(event::vapid::changed) + .next_back() + .expect("the application will have a vapid key after a refresh"); + + // Create a dummy subscription with that key. + + let request = super::Request { + subscription: super::Subscription { + endpoint: String::from("https://push.example.com/endpoint"), + keys: super::Keys { + p256dh: String::from("test-p256dh-value"), + auth: String::from("test-auth-value"), + }, + }, + vapid: vapid.key, + }; + let response = super::handler(State(app.push()), subscriber, Json(request)) + .await + .expect("test request will succeed on a fresh app"); + + // Check that the response looks as expected. + + assert_eq!(StatusCode::CREATED, response); +} + +#[tokio::test] +async fn accepts_repeat_subscription() { + let app = fixtures::scratch_app().await; + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + + // Issue a VAPID key. + + app.vapid() + .refresh_key(&fixtures::now()) + .await + .expect("refreshing the VAPID key always succeeds"); + + // Find out what that VAPID key is. + + let boot = app.boot().snapshot().await.expect("boot always succeeds"); + let vapid = boot + .events + .into_iter() + .filter_map(event::vapid) + .filter_map(event::vapid::changed) + .next_back() + .expect("the application will have a vapid key after a refresh"); + + // Create a dummy subscription with that key. + + let request = super::Request { + subscription: super::Subscription { + endpoint: String::from("https://push.example.com/endpoint"), + keys: super::Keys { + p256dh: String::from("test-p256dh-value"), + auth: String::from("test-auth-value"), + }, + }, + vapid: vapid.key, + }; + let response = super::handler(State(app.push()), subscriber.clone(), Json(request.clone())) + .await + .expect("test request will succeed on a fresh app"); + + // Check that the response looks as expected. + + assert_eq!(StatusCode::CREATED, response); + + // Repeat the request + + let response = super::handler(State(app.push()), subscriber, Json(request)) + .await + .expect("test request will succeed twice on a fresh app"); + + // Check that the second response also looks as expected. + + assert_eq!(StatusCode::CREATED, response); +} + +#[tokio::test] +async fn rejects_duplicate_subscription() { + let app = fixtures::scratch_app().await; + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + + // Issue a VAPID key. + + app.vapid() + .refresh_key(&fixtures::now()) + .await + .expect("refreshing the VAPID key always succeeds"); + + // Find out what that VAPID key is. + + let boot = app.boot().snapshot().await.expect("boot always succeeds"); + let vapid = boot + .events + .into_iter() + .filter_map(event::vapid) + .filter_map(event::vapid::changed) + .next_back() + .expect("the application will have a vapid key after a refresh"); + + // Create a dummy subscription with that key. + + let request = super::Request { + subscription: super::Subscription { + endpoint: String::from("https://push.example.com/endpoint"), + keys: super::Keys { + p256dh: String::from("test-p256dh-value"), + auth: String::from("test-auth-value"), + }, + }, + vapid: vapid.key, + }; + super::handler(State(app.push()), subscriber.clone(), Json(request)) + .await + .expect("test request will succeed on a fresh app"); + + // Repeat the request with different keys + + let request = super::Request { + subscription: super::Subscription { + endpoint: String::from("https://push.example.com/endpoint"), + keys: super::Keys { + p256dh: String::from("different-test-p256dh-value"), + auth: String::from("different-test-auth-value"), + }, + }, + vapid: vapid.key, + }; + let response = super::handler(State(app.push()), subscriber, Json(request)) + .await + .expect_err("request with duplicate endpoint should fail"); + + // Make sure we got the error we expected. + + assert!(matches!(response, super::Error(SubscribeError::Duplicate))); +} + +#[tokio::test] +async fn rejects_stale_vapid_key() { + let app = fixtures::scratch_app().await; + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + + // Issue a VAPID key. + + app.vapid() + .refresh_key(&fixtures::now()) + .await + .expect("refreshing the VAPID key always succeeds"); + + // Find out what that VAPID key is. + + let boot = app.boot().snapshot().await.expect("boot always succeeds"); + let vapid = boot + .events + .into_iter() + .filter_map(event::vapid) + .filter_map(event::vapid::changed) + .next_back() + .expect("the application will have a vapid key after a refresh"); + + // Change the VAPID key. + + app.vapid() + .rotate_key() + .await + .expect("key rotation always succeeds"); + app.vapid() + .refresh_key(&fixtures::now()) + .await + .expect("refreshing the VAPID key always succeeds"); + + // Find out what the new VAPID key is. + + let boot = app.boot().snapshot().await.expect("boot always succeeds"); + let fresh_vapid = boot + .events + .into_iter() + .filter_map(event::vapid) + .filter_map(event::vapid::changed) + .next_back() + .expect("the application will have a vapid key after a refresh"); + + // Create a dummy subscription with the original key. + + let request = super::Request { + subscription: super::Subscription { + endpoint: String::from("https://push.example.com/endpoint"), + keys: super::Keys { + p256dh: String::from("test-p256dh-value"), + auth: String::from("test-auth-value"), + }, + }, + vapid: vapid.key, + }; + let response = super::handler(State(app.push()), subscriber, Json(request)) + .await + .expect_err("test request has a stale vapid key"); + + // Check that the response looks as expected. + + assert!(matches!( + response, + super::Error(SubscribeError::StaleVapidKey(key)) if key == fresh_vapid.key + )); +} diff --git a/src/push/mod.rs b/src/push/mod.rs new file mode 100644 index 0000000..1394ea4 --- /dev/null +++ b/src/push/mod.rs @@ -0,0 +1,3 @@ +pub mod app; +pub mod handlers; +pub mod repo; diff --git a/src/push/repo.rs b/src/push/repo.rs new file mode 100644 index 0000000..4183489 --- /dev/null +++ b/src/push/repo.rs @@ -0,0 +1,149 @@ +use sqlx::{Sqlite, SqliteConnection, Transaction}; +use web_push::SubscriptionInfo; + +use crate::{login::Login, token::Token}; + +pub trait Provider { + fn push(&mut self) -> Push<'_>; +} + +impl Provider for Transaction<'_, Sqlite> { + fn push(&mut self) -> Push<'_> { + Push(self) + } +} + +pub struct Push<'t>(&'t mut SqliteConnection); + +impl Push<'_> { + pub async fn create( + &mut self, + token: &Token, + subscription: &SubscriptionInfo, + ) -> Result<(), sqlx::Error> { + sqlx::query!( + r#" + insert into push_subscription (token, endpoint, p256dh, auth) + values ($1, $2, $3, $4) + "#, + token.id, + subscription.endpoint, + subscription.keys.p256dh, + subscription.keys.auth, + ) + .execute(&mut *self.0) + .await?; + + Ok(()) + } + + pub async fn by_login(&mut self, login: &Login) -> Result<Vec<SubscriptionInfo>, sqlx::Error> { + sqlx::query!( + r#" + select + subscription.endpoint, + subscription.p256dh, + subscription.auth + from push_subscription as subscription + join token on subscription.token = token.id + where token.login = $1 + "#, + login.id, + ) + .map(|row| SubscriptionInfo::new(row.endpoint, row.p256dh, row.auth)) + .fetch_all(&mut *self.0) + .await + } + + pub async fn by_endpoint( + &mut self, + subscriber: &Login, + endpoint: &str, + ) -> Result<SubscriptionInfo, sqlx::Error> { + let row = sqlx::query!( + r#" + select + subscription.endpoint, + subscription.p256dh, + subscription.auth + from push_subscription as subscription + join token on subscription.token = token.id + join login as subscriber on token.login = subscriber.id + where subscriber.id = $1 + and subscription.endpoint = $2 + "#, + subscriber.id, + endpoint, + ) + .fetch_one(&mut *self.0) + .await?; + + let info = SubscriptionInfo::new(row.endpoint, row.p256dh, row.auth); + + Ok(info) + } + + pub async fn unsubscribe( + &mut self, + subscription: &SubscriptionInfo, + ) -> Result<(), sqlx::Error> { + sqlx::query!( + r#" + delete from push_subscription + where endpoint = $1 + "#, + subscription.endpoint, + ) + .execute(&mut *self.0) + .await?; + + Ok(()) + } + + pub async fn unsubscribe_token(&mut self, token: &Token) -> Result<(), sqlx::Error> { + sqlx::query!( + r#" + delete from push_subscription + where token = $1 + "#, + token.id, + ) + .execute(&mut *self.0) + .await?; + + Ok(()) + } + + pub async fn unsubscribe_login(&mut self, login: &Login) -> Result<(), sqlx::Error> { + sqlx::query!( + r#" + with tokens as ( + select id from token + where login = $1 + ) + delete from push_subscription + where token in tokens + "#, + login.id, + ) + .execute(&mut *self.0) + .await?; + + Ok(()) + } + + // Unsubscribe logic for token expiry lives in the `tokens` repository, for maintenance reasons. + + pub async fn clear(&mut self) -> Result<(), sqlx::Error> { + // We assume that _all_ stored subscriptions are for a VAPID key we're about to delete. + sqlx::query!( + r#" + delete from push_subscription + "#, + ) + .execute(&mut *self.0) + .await?; + + Ok(()) + } +} |
