diff options
| author | Owen Jacobson <owen@grimoire.ca> | 2025-12-09 15:13:21 -0500 |
|---|---|---|
| committer | Owen Jacobson <owen@grimoire.ca> | 2025-12-17 15:48:20 -0500 |
| commit | 3c697f5fb1b8dbad46eac8fa299ed7cebfb36159 (patch) | |
| tree | b7854fb23d1e104f928acfe3bba75ea3b74b83d9 /src | |
| parent | 41a5a0f7e13bf5a82aaef59e34eb68f0fe7fa7f5 (diff) | |
Factor push message publication out to its own helper component.
The `Publisher` component handles the details of web push delivery. Callers must provide the subscription set, the current signer, and the message, while the publisher handles encoding and communication with web push endpoints.
To facilitate testing, `Publisher` implements `Publish`, which is a new trait with the same interface. Components that might publish web push messages should rely on the trait where possible. The test suite now constructs an app with a dummy `Publish` impl, which captures push messages for examination.
Note that the testing implementation of `Publish` is hand-crafted, and presently only acts to record the arguments it receives. The other alternative was to use a mocking library, such as `mockit`, and while I've used that approach before, I'm not super comfortable with the complexity in this situation. I think we can maintain a more reasonable testing `Publish` impl by hand, at least for now, and we can revisit that decision later if need be.
Tests for the `ping` endpoint have been migrated to this endpoint.
Diffstat (limited to 'src')
| -rw-r--r-- | src/app.rs | 18 | ||||
| -rw-r--r-- | src/boot/handlers/boot/test.rs | 14 | ||||
| -rw-r--r-- | src/cli.rs | 11 | ||||
| -rw-r--r-- | src/event/handlers/stream/test/vapid.rs | 6 | ||||
| -rw-r--r-- | src/event/mod.rs | 2 | ||||
| -rw-r--r-- | src/push/app.rs | 68 | ||||
| -rw-r--r-- | src/push/handlers/ping/mod.rs | 9 | ||||
| -rw-r--r-- | src/push/handlers/ping/test.rs | 18 | ||||
| -rw-r--r-- | src/push/handlers/subscribe/test.rs | 95 | ||||
| -rw-r--r-- | src/push/mod.rs | 3 | ||||
| -rw-r--r-- | src/push/publisher.rs | 83 | ||||
| -rw-r--r-- | src/routes.rs | 8 | ||||
| -rw-r--r-- | src/test/fixtures/mod.rs | 12 | ||||
| -rw-r--r-- | src/test/fixtures/vapid.rs | 16 | ||||
| -rw-r--r-- | src/test/webpush.rs | 55 |
15 files changed, 235 insertions, 183 deletions
@@ -19,18 +19,18 @@ use crate::{ #[derive(Clone)] pub struct App<P> { db: SqlitePool, - webpush: P, + publisher: P, events: event::Broadcaster, token_events: token::Broadcaster, } impl<P> App<P> { - pub fn from(db: SqlitePool, webpush: P) -> Self { + pub fn from(db: SqlitePool, publisher: P) -> Self { let events = event::Broadcaster::default(); let token_events = token::Broadcaster::default(); Self { db, - webpush, + publisher, events, token_events, } @@ -62,11 +62,16 @@ impl<P> App<P> { Messages::new(self.db.clone(), self.events.clone()) } + #[cfg(test)] + pub fn publisher(&self) -> &P { + &self.publisher + } + pub fn push(&self) -> Push<P> where P: Clone, { - Push::new(self.db.clone(), self.webpush.clone()) + Push::new(self.db.clone(), self.publisher.clone()) } pub fn setup(&self) -> Setup { @@ -85,11 +90,6 @@ impl<P> App<P> { pub fn vapid(&self) -> Vapid { Vapid::new(self.db.clone(), self.events.clone()) } - - #[cfg(test)] - pub fn webpush(&self) -> &P { - &self.webpush - } } impl<P> FromRef<App<P>> for Boot { diff --git a/src/boot/handlers/boot/test.rs b/src/boot/handlers/boot/test.rs index f192478..0aef694 100644 --- a/src/boot/handlers/boot/test.rs +++ b/src/boot/handlers/boot/test.rs @@ -84,11 +84,6 @@ async fn includes_messages() { async fn includes_vapid_key() { let app = fixtures::scratch_app().await; - app.vapid() - .refresh_key(&fixtures::now()) - .await - .expect("key rotation always succeeds"); - let viewer = fixtures::identity::fictitious(); let response = super::handler(State(app.boot()), viewer) .await @@ -108,11 +103,6 @@ async fn includes_vapid_key() { async fn includes_only_latest_vapid_key() { let app = fixtures::scratch_app().await; - app.vapid() - .refresh_key(&fixtures::ancient()) - .await - .expect("key rotation always succeeds"); - let viewer = fixtures::identity::fictitious(); let response = super::handler(State(app.boot()), viewer.clone()) .await @@ -128,6 +118,10 @@ async fn includes_only_latest_vapid_key() { .expect("only one vapid key has been created"); app.vapid() + .revoke_key() + .await + .expect("key revocation always succeeds"); + app.vapid() .refresh_key(&fixtures::now()) .await .expect("key rotation always succeeds"); @@ -13,13 +13,13 @@ use axum::{ use clap::{CommandFactory, Parser, Subcommand}; use sqlx::sqlite::SqlitePool; use tokio::net; -use web_push::{IsahcWebPushClient, WebPushClient}; pub use crate::exit::Exit; use crate::{ app::App, clock, db, error::failed::{Failed, ResultExt as _}, + push::Publisher, routes, umask::Umask, }; @@ -101,8 +101,8 @@ impl Args { self.umask.set(); let pool = self.pool().await.fail("Failed to create database pool")?; - let webpush = IsahcWebPushClient::new().fail("Failed to create web push publisher")?; - let app = App::from(pool, webpush); + let publisher = Publisher::new().fail("Failed to create web push publisher")?; + let app = App::from(pool, publisher); match self.command { None => self.serve(app).await?, @@ -116,10 +116,7 @@ impl Args { Result::<_, Failed>::Ok(()) } - async fn serve<P>(self, app: App<P>) -> Result<(), Failed> - where - P: WebPushClient + Clone + Send + Sync + 'static, - { + async fn serve(self, app: App<Publisher>) -> Result<(), Failed> { let app = routes::routes(&app) .route_layer(middleware::from_fn(clock::middleware)) .route_layer(middleware::map_response(Self::server_info())) diff --git a/src/event/handlers/stream/test/vapid.rs b/src/event/handlers/stream/test/vapid.rs index dbc3929..4d7f2dd 100644 --- a/src/event/handlers/stream/test/vapid.rs +++ b/src/event/handlers/stream/test/vapid.rs @@ -7,7 +7,7 @@ use crate::test::{fixtures, fixtures::future::Expect as _}; #[tokio::test] async fn live_vapid_key_changes() { // Set up the context - let app = fixtures::scratch_app().await; + let app = fixtures::scratch_app_without_vapid().await; let resume_point = fixtures::boot::resume_point(&app).await; // Subscribe to events @@ -42,7 +42,7 @@ async fn live_vapid_key_changes() { #[tokio::test] async fn stored_vapid_key_changes() { // Set up the context - let app = fixtures::scratch_app().await; + let app = fixtures::scratch_app_without_vapid().await; let resume_point = fixtures::boot::resume_point(&app).await; // Rotate the VAPID key @@ -77,7 +77,7 @@ async fn stored_vapid_key_changes() { #[tokio::test] async fn no_past_vapid_key_changes() { // Set up the context - let app = fixtures::scratch_app().await; + let app = fixtures::scratch_app_without_vapid().await; // Rotate the VAPID key diff --git a/src/event/mod.rs b/src/event/mod.rs index 83b0ce7..cb7c969 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -29,7 +29,7 @@ pub enum Event { // above - though heartbeat events contain only a type field and none of the other event gubbins. // They don't have to participate in sequence numbering, aren't generated from stored data, and // generally Are Weird. -#[derive(serde::Serialize)] +#[derive(Eq, PartialEq, serde::Serialize)] #[serde(tag = "type", rename_all = "snake_case")] pub enum Heartbeat { Heartbeat, diff --git a/src/push/app.rs b/src/push/app.rs index 2bd6c25..ebfc220 100644 --- a/src/push/app.rs +++ b/src/push/app.rs @@ -1,29 +1,26 @@ -use futures::future::join_all; -use itertools::Itertools as _; use p256::ecdsa::VerifyingKey; use sqlx::SqlitePool; -use web_push::{ - ContentEncoding, PartialVapidSignatureBuilder, SubscriptionInfo, WebPushClient, WebPushError, - WebPushMessage, WebPushMessageBuilder, -}; +use web_push::{SubscriptionInfo, WebPushError}; use super::repo::Provider as _; use crate::{ db, error::failed::{ErrorExt as _, Failed, ResultExt as _}, + event::Heartbeat, login::Login, + push::publisher::Publish, token::extract::Identity, vapid::repo::Provider as _, }; pub struct Push<P> { db: SqlitePool, - webpush: P, + publisher: P, } impl<P> Push<P> { - pub const fn new(db: SqlitePool, webpush: P) -> Self { - Self { db, webpush } + pub const fn new(db: SqlitePool, publisher: P) -> Self { + Self { db, publisher } } pub async fn subscribe( @@ -79,28 +76,8 @@ impl<P> Push<P> { impl<P> Push<P> where - P: WebPushClient, + P: Publish, { - fn prepare_ping( - signer: &PartialVapidSignatureBuilder, - subscription: &SubscriptionInfo, - ) -> Result<WebPushMessage, PushError> { - let signature = signer - .clone() - .add_sub_info(subscription) - .build() - .fail("Failed to build VAPID signature")?; - - let payload = "ping".as_bytes(); - - let mut message = WebPushMessageBuilder::new(subscription); - message.set_payload(ContentEncoding::Aes128Gcm, payload); - message.set_vapid_signature(signature); - let message = message.build().fail("Failed to build push message")?; - - Ok(message) - } - pub async fn ping(&self, recipient: &Login) -> Result<(), PushError> { let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?; @@ -116,22 +93,24 @@ where ) })?; - let pings: Vec<_> = subscriptions - .into_iter() - .map(|sub| Self::prepare_ping(&signer, &sub).map(|message| (sub, message))) - .try_collect()?; - - let deliveries = pings - .into_iter() - .map(async |(sub, message)| (sub, self.webpush.send(message).await)); + // We're about to perform some fairly IO-intensive outside actions. Holding a tx open + // across them raises the risk that other clients will encounter errors due to locks from + // this transaction, so release it here. We'll open a new transaction if there's something + // we need to write. + tx.commit().await.fail(db::failed::COMMIT)?; - let failures: Vec<_> = join_all(deliveries) + let failures = self + .publisher + .publish(Heartbeat::Heartbeat, signer, subscriptions) .await - .into_iter() - .filter_map(|(sub, result)| result.err().map(|err| (sub, err))) - .collect(); + .fail("Failed to send push message")?; if !failures.is_empty() { + let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?; + // Note that data integrity guarantees from the original transaction to read + // subscriptions may no longer be valid now. Time has passed. Depending on how slow + // delivering push notifications is, potentially a _lot_ of time has passed. + for (sub, err) in &failures { match err { // I _think_ this is the complete set of permanent failures. See @@ -152,13 +131,12 @@ where } } + tx.commit().await.fail(db::failed::COMMIT)?; + return Err(PushError::Delivery( failures.into_iter().map(|(_, err)| err).collect(), )); } - - tx.commit().await.fail(db::failed::COMMIT)?; - Ok(()) } } diff --git a/src/push/handlers/ping/mod.rs b/src/push/handlers/ping/mod.rs index db828fa..2a86984 100644 --- a/src/push/handlers/ping/mod.rs +++ b/src/push/handlers/ping/mod.rs @@ -1,7 +1,10 @@ use axum::{Json, extract::State, http::StatusCode}; -use web_push::WebPushClient; -use crate::{error::Internal, push::app::Push, token::extract::Identity}; +use crate::{ + error::Internal, + push::{Publish, app::Push}, + token::extract::Identity, +}; #[cfg(test)] mod test; @@ -15,7 +18,7 @@ pub async fn handler<P>( Json(_): Json<Request>, ) -> Result<StatusCode, Internal> where - P: WebPushClient, + P: Publish, { push.ping(&identity.login).await?; diff --git a/src/push/handlers/ping/test.rs b/src/push/handlers/ping/test.rs index 5725131..3481139 100644 --- a/src/push/handlers/ping/test.rs +++ b/src/push/handlers/ping/test.rs @@ -2,8 +2,9 @@ use axum::{ extract::{Json, State}, http::StatusCode, }; +use itertools::Itertools; -use crate::test::fixtures; +use crate::{event::Heartbeat, test::fixtures}; #[tokio::test] async fn ping_without_subscriptions() { @@ -11,18 +12,21 @@ async fn ping_without_subscriptions() { let recipient = fixtures::identity::create(&app, &fixtures::now()).await; - app.vapid() - .refresh_key(&fixtures::now()) - .await - .expect("refreshing the VAPID key always succeeds"); - let response = super::handler(State(app.push()), recipient, Json(super::Request {})) .await .expect("sending a ping with no subscriptions always succeeds"); assert_eq!(StatusCode::ACCEPTED, response); - assert!(app.webpush().sent().is_empty()); + assert!( + app.publisher() + .sent() + .into_iter() + .filter(|publish| publish.message_eq(&Heartbeat::Heartbeat) + && publish.subscriptions.is_empty()) + .exactly_one() + .is_ok() + ); } // More complete testing requires that we figure out how to generate working p256 ECDH keys for diff --git a/src/push/handlers/subscribe/test.rs b/src/push/handlers/subscribe/test.rs index 1bc37a4..793bcef 100644 --- a/src/push/handlers/subscribe/test.rs +++ b/src/push/handlers/subscribe/test.rs @@ -3,33 +3,16 @@ use axum::{ http::StatusCode, }; -use crate::{ - push::app::SubscribeError, - test::{fixtures, fixtures::event}, -}; +use crate::{push::app::SubscribeError, test::fixtures}; #[tokio::test] async fn accepts_new_subscription() { let app = fixtures::scratch_app().await; let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - // Issue a VAPID key. - - app.vapid() - .refresh_key(&fixtures::now()) - .await - .expect("refreshing the VAPID key always succeeds"); - // Find out what that VAPID key is. - let boot = app.boot().snapshot().await.expect("boot always succeeds"); - let vapid = boot - .events - .into_iter() - .filter_map(event::vapid) - .filter_map(event::vapid::changed) - .next_back() - .expect("the application will have a vapid key after a refresh"); + let vapid = fixtures::vapid::key(&app).await; // Create a dummy subscription with that key. @@ -41,7 +24,7 @@ async fn accepts_new_subscription() { auth: String::from("test-auth-value"), }, }, - vapid: vapid.key, + vapid, }; let response = super::handler(State(app.push()), subscriber, Json(request)) .await @@ -57,23 +40,9 @@ async fn accepts_repeat_subscription() { let app = fixtures::scratch_app().await; let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - // Issue a VAPID key. - - app.vapid() - .refresh_key(&fixtures::now()) - .await - .expect("refreshing the VAPID key always succeeds"); - // Find out what that VAPID key is. - let boot = app.boot().snapshot().await.expect("boot always succeeds"); - let vapid = boot - .events - .into_iter() - .filter_map(event::vapid) - .filter_map(event::vapid::changed) - .next_back() - .expect("the application will have a vapid key after a refresh"); + let vapid = fixtures::vapid::key(&app).await; // Create a dummy subscription with that key. @@ -85,7 +54,7 @@ async fn accepts_repeat_subscription() { auth: String::from("test-auth-value"), }, }, - vapid: vapid.key, + vapid, }; let response = super::handler(State(app.push()), subscriber.clone(), Json(request.clone())) .await @@ -111,23 +80,9 @@ async fn rejects_duplicate_subscription() { let app = fixtures::scratch_app().await; let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - // Issue a VAPID key. - - app.vapid() - .refresh_key(&fixtures::now()) - .await - .expect("refreshing the VAPID key always succeeds"); - // Find out what that VAPID key is. - let boot = app.boot().snapshot().await.expect("boot always succeeds"); - let vapid = boot - .events - .into_iter() - .filter_map(event::vapid) - .filter_map(event::vapid::changed) - .next_back() - .expect("the application will have a vapid key after a refresh"); + let vapid = fixtures::vapid::key(&app).await; // Create a dummy subscription with that key. @@ -139,7 +94,7 @@ async fn rejects_duplicate_subscription() { auth: String::from("test-auth-value"), }, }, - vapid: vapid.key, + vapid, }; super::handler(State(app.push()), subscriber.clone(), Json(request)) .await @@ -155,7 +110,7 @@ async fn rejects_duplicate_subscription() { auth: String::from("different-test-auth-value"), }, }, - vapid: vapid.key, + vapid, }; let response = super::handler(State(app.push()), subscriber, Json(request)) .await @@ -170,24 +125,7 @@ async fn rejects_duplicate_subscription() { async fn rejects_stale_vapid_key() { let app = fixtures::scratch_app().await; let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - - // Issue a VAPID key. - - app.vapid() - .refresh_key(&fixtures::now()) - .await - .expect("refreshing the VAPID key always succeeds"); - - // Find out what that VAPID key is. - - let boot = app.boot().snapshot().await.expect("boot always succeeds"); - let vapid = boot - .events - .into_iter() - .filter_map(event::vapid) - .filter_map(event::vapid::changed) - .next_back() - .expect("the application will have a vapid key after a refresh"); + let stale_vapid = fixtures::vapid::key(&app).await; // Change the VAPID key. @@ -200,16 +138,7 @@ async fn rejects_stale_vapid_key() { .await .expect("refreshing the VAPID key always succeeds"); - // Find out what the new VAPID key is. - - let boot = app.boot().snapshot().await.expect("boot always succeeds"); - let fresh_vapid = boot - .events - .into_iter() - .filter_map(event::vapid) - .filter_map(event::vapid::changed) - .next_back() - .expect("the application will have a vapid key after a refresh"); + let fresh_vapid = fixtures::vapid::key(&app).await; // Create a dummy subscription with the original key. @@ -221,7 +150,7 @@ async fn rejects_stale_vapid_key() { auth: String::from("test-auth-value"), }, }, - vapid: vapid.key, + vapid: stale_vapid, }; let response = super::handler(State(app.push()), subscriber, Json(request)) .await @@ -231,6 +160,6 @@ async fn rejects_stale_vapid_key() { assert!(matches!( response, - super::Error(SubscribeError::StaleVapidKey(key)) if key == fresh_vapid.key + super::Error(SubscribeError::StaleVapidKey(key)) if key == fresh_vapid )); } diff --git a/src/push/mod.rs b/src/push/mod.rs index 1394ea4..042991f 100644 --- a/src/push/mod.rs +++ b/src/push/mod.rs @@ -1,3 +1,6 @@ pub mod app; pub mod handlers; +mod publisher; pub mod repo; + +pub use publisher::{Publish, Publisher}; diff --git a/src/push/publisher.rs b/src/push/publisher.rs new file mode 100644 index 0000000..4092724 --- /dev/null +++ b/src/push/publisher.rs @@ -0,0 +1,83 @@ +use futures::future::join_all; +use itertools::Itertools as _; +use serde::Serialize; +use web_push::{ + ContentEncoding, IsahcWebPushClient, PartialVapidSignatureBuilder, SubscriptionInfo, + WebPushClient, WebPushError, WebPushMessage, WebPushMessageBuilder, +}; + +use crate::error::failed::{Failed, ResultExt as _}; + +pub trait Publish { + fn publish<M>( + &self, + message: M, + signer: PartialVapidSignatureBuilder, + subscriptions: impl IntoIterator<Item = SubscriptionInfo> + Send, + ) -> impl Future<Output = Result<Vec<(SubscriptionInfo, WebPushError)>, Failed>> + Send + where + M: Serialize + Send + 'static; +} + +#[derive(Clone)] +pub struct Publisher { + client: IsahcWebPushClient, +} + +impl Publisher { + pub fn new() -> Result<Self, WebPushError> { + let client = IsahcWebPushClient::new()?; + Ok(Self { client }) + } + + fn prepare_message( + payload: &[u8], + signer: &PartialVapidSignatureBuilder, + subscription: &SubscriptionInfo, + ) -> Result<WebPushMessage, Failed> { + let signature = signer + .clone() + .add_sub_info(subscription) + .build() + .fail("Failed to build VAPID signature")?; + + let mut message = WebPushMessageBuilder::new(subscription); + message.set_payload(ContentEncoding::Aes128Gcm, payload); + message.set_vapid_signature(signature); + let message = message.build().fail("Failed to build push message")?; + + Ok(message) + } +} + +impl Publish for Publisher { + async fn publish<M>( + &self, + message: M, + signer: PartialVapidSignatureBuilder, + subscriptions: impl IntoIterator<Item = SubscriptionInfo> + Send, + ) -> Result<Vec<(SubscriptionInfo, WebPushError)>, Failed> + where + M: Serialize + Send + 'static, + { + let payload = serde_json::to_vec_pretty(&message) + .fail("Failed to encode web push message to JSON")?; + + let messages: Vec<_> = subscriptions + .into_iter() + .map(|sub| Self::prepare_message(&payload, &signer, &sub).map(|message| (sub, message))) + .try_collect()?; + + let deliveries = messages + .into_iter() + .map(async |(sub, message)| (sub, self.client.send(message).await)); + + let failures = join_all(deliveries) + .await + .into_iter() + .filter_map(|(sub, result)| result.err().map(|err| (sub, err))) + .collect(); + + Ok(failures) + } +} diff --git a/src/routes.rs b/src/routes.rs index 1c07e78..0bf429a 100644 --- a/src/routes.rs +++ b/src/routes.rs @@ -3,15 +3,17 @@ use axum::{ response::Redirect, routing::{delete, get, post}, }; -use web_push::WebPushClient; use crate::{ - app::App, boot, conversation, event, expire, invite, login, message, push, setup, ui, vapid, + app::App, + boot, conversation, event, expire, invite, login, message, + push::{self, Publish}, + setup, ui, vapid, }; pub fn routes<P>(app: &App<P>) -> Router<App<P>> where - P: WebPushClient + Clone + Send + Sync + 'static, + P: Publish + Clone + Sync + Send + 'static, { // UI routes that can be accessed before the administrator completes setup. let ui_bootstrap = Router::new() diff --git a/src/test/fixtures/mod.rs b/src/test/fixtures/mod.rs index 53bf31b..85935d6 100644 --- a/src/test/fixtures/mod.rs +++ b/src/test/fixtures/mod.rs @@ -12,8 +12,20 @@ pub mod invite; pub mod login; pub mod message; pub mod user; +pub mod vapid; pub async fn scratch_app() -> App<Client> { + let app = scratch_app_without_vapid().await; + + app.vapid() + .refresh_key(&now()) + .await + .expect("refreshing the VAPID key always succeeds"); + + app +} + +pub async fn scratch_app_without_vapid() -> App<Client> { let pool = db::prepare("sqlite::memory:", "sqlite::memory:") .await .expect("setting up in-memory sqlite database"); diff --git a/src/test/fixtures/vapid.rs b/src/test/fixtures/vapid.rs new file mode 100644 index 0000000..29cdf1a --- /dev/null +++ b/src/test/fixtures/vapid.rs @@ -0,0 +1,16 @@ +use p256::ecdsa::VerifyingKey; + +use crate::{app::App, test::fixtures}; + +pub async fn key<P>(app: &App<P>) -> VerifyingKey { + let boot = app.boot().snapshot().await.expect("boot always succeeds"); + let changed = boot + .events + .into_iter() + .filter_map(fixtures::event::vapid) + .filter_map(fixtures::event::vapid::changed) + .next_back() + .expect("the application has a vapid key"); + + changed.key +} diff --git a/src/test/webpush.rs b/src/test/webpush.rs index a611ad0..f33f03c 100644 --- a/src/test/webpush.rs +++ b/src/test/webpush.rs @@ -1,13 +1,16 @@ use std::{ + any::Any, mem, sync::{Arc, Mutex}, }; -use web_push::{WebPushClient, WebPushError, WebPushMessage}; +use web_push::{PartialVapidSignatureBuilder, SubscriptionInfo, WebPushError}; + +use crate::{error::failed::Failed, push::Publish}; #[derive(Clone)] pub struct Client { - sent: Arc<Mutex<Vec<WebPushMessage>>>, + sent: Arc<Mutex<Vec<Publication>>>, } impl Client { @@ -18,20 +21,48 @@ impl Client { } // Clears the list of sent messages (for all clones of this Client) when called, because we - // can't clone `WebPushMessage`s so we either need to move them or try to reconstruct them, - // either of which sucks but moving them sucks less. - pub fn sent(&self) -> Vec<WebPushMessage> { + // can't clone `Publications`s, so we either need to move them or try to reconstruct them. + pub fn sent(&self) -> Vec<Publication> { let mut sent = self.sent.lock().unwrap(); - mem::take(&mut *sent) + mem::take(&mut sent) } } -#[async_trait::async_trait] -impl WebPushClient for Client { - async fn send(&self, message: WebPushMessage) -> Result<(), WebPushError> { - let mut sent = self.sent.lock().unwrap(); - sent.push(message); +impl Publish for Client { + async fn publish<M>( + &self, + message: M, + _: PartialVapidSignatureBuilder, + subscriptions: impl IntoIterator<Item = SubscriptionInfo> + Send, + ) -> Result<Vec<(SubscriptionInfo, WebPushError)>, Failed> + where + M: Send + 'static, + { + let message: Box<dyn Any + Send> = Box::new(message); + let subscriptions = subscriptions.into_iter().collect(); + let publication = Publication { + message, + subscriptions, + }; + self.sent.lock().unwrap().push(publication); + + Ok(Vec::new()) + } +} + +pub struct Publication { + pub message: Box<dyn Any + Send>, + pub subscriptions: Vec<SubscriptionInfo>, +} - Ok(()) +impl Publication { + pub fn message_eq<M>(&self, candidate: &M) -> bool + where + M: PartialEq + 'static, + { + match self.message.downcast_ref::<M>() { + None => false, + Some(message) => message == candidate, + } } } |
