diff options
| -rw-r--r-- | src/app.rs | 18 | ||||
| -rw-r--r-- | src/boot/handlers/boot/test.rs | 14 | ||||
| -rw-r--r-- | src/cli.rs | 11 | ||||
| -rw-r--r-- | src/event/handlers/stream/test/vapid.rs | 6 | ||||
| -rw-r--r-- | src/event/mod.rs | 2 | ||||
| -rw-r--r-- | src/push/app.rs | 68 | ||||
| -rw-r--r-- | src/push/handlers/ping/mod.rs | 9 | ||||
| -rw-r--r-- | src/push/handlers/ping/test.rs | 18 | ||||
| -rw-r--r-- | src/push/handlers/subscribe/test.rs | 95 | ||||
| -rw-r--r-- | src/push/mod.rs | 3 | ||||
| -rw-r--r-- | src/push/publisher.rs | 83 | ||||
| -rw-r--r-- | src/routes.rs | 8 | ||||
| -rw-r--r-- | src/test/fixtures/mod.rs | 12 | ||||
| -rw-r--r-- | src/test/fixtures/vapid.rs | 16 | ||||
| -rw-r--r-- | src/test/webpush.rs | 55 |
15 files changed, 235 insertions, 183 deletions
@@ -19,18 +19,18 @@ use crate::{ #[derive(Clone)] pub struct App<P> { db: SqlitePool, - webpush: P, + publisher: P, events: event::Broadcaster, token_events: token::Broadcaster, } impl<P> App<P> { - pub fn from(db: SqlitePool, webpush: P) -> Self { + pub fn from(db: SqlitePool, publisher: P) -> Self { let events = event::Broadcaster::default(); let token_events = token::Broadcaster::default(); Self { db, - webpush, + publisher, events, token_events, } @@ -62,11 +62,16 @@ impl<P> App<P> { Messages::new(self.db.clone(), self.events.clone()) } + #[cfg(test)] + pub fn publisher(&self) -> &P { + &self.publisher + } + pub fn push(&self) -> Push<P> where P: Clone, { - Push::new(self.db.clone(), self.webpush.clone()) + Push::new(self.db.clone(), self.publisher.clone()) } pub fn setup(&self) -> Setup { @@ -85,11 +90,6 @@ impl<P> App<P> { pub fn vapid(&self) -> Vapid { Vapid::new(self.db.clone(), self.events.clone()) } - - #[cfg(test)] - pub fn webpush(&self) -> &P { - &self.webpush - } } impl<P> FromRef<App<P>> for Boot { diff --git a/src/boot/handlers/boot/test.rs b/src/boot/handlers/boot/test.rs index f192478..0aef694 100644 --- a/src/boot/handlers/boot/test.rs +++ b/src/boot/handlers/boot/test.rs @@ -84,11 +84,6 @@ async fn includes_messages() { async fn includes_vapid_key() { let app = fixtures::scratch_app().await; - app.vapid() - .refresh_key(&fixtures::now()) - .await - .expect("key rotation always succeeds"); - let viewer = fixtures::identity::fictitious(); let response = super::handler(State(app.boot()), viewer) .await @@ -108,11 +103,6 @@ async fn includes_vapid_key() { async fn includes_only_latest_vapid_key() { let app = fixtures::scratch_app().await; - app.vapid() - .refresh_key(&fixtures::ancient()) - .await - .expect("key rotation always succeeds"); - let viewer = fixtures::identity::fictitious(); let response = super::handler(State(app.boot()), viewer.clone()) .await @@ -128,6 +118,10 @@ async fn includes_only_latest_vapid_key() { .expect("only one vapid key has been created"); app.vapid() + .revoke_key() + .await + .expect("key revocation always succeeds"); + app.vapid() .refresh_key(&fixtures::now()) .await .expect("key rotation always succeeds"); @@ -13,13 +13,13 @@ use axum::{ use clap::{CommandFactory, Parser, Subcommand}; use sqlx::sqlite::SqlitePool; use tokio::net; -use web_push::{IsahcWebPushClient, WebPushClient}; pub use crate::exit::Exit; use crate::{ app::App, clock, db, error::failed::{Failed, ResultExt as _}, + push::Publisher, routes, umask::Umask, }; @@ -101,8 +101,8 @@ impl Args { self.umask.set(); let pool = self.pool().await.fail("Failed to create database pool")?; - let webpush = IsahcWebPushClient::new().fail("Failed to create web push publisher")?; - let app = App::from(pool, webpush); + let publisher = Publisher::new().fail("Failed to create web push publisher")?; + let app = App::from(pool, publisher); match self.command { None => self.serve(app).await?, @@ -116,10 +116,7 @@ impl Args { Result::<_, Failed>::Ok(()) } - async fn serve<P>(self, app: App<P>) -> Result<(), Failed> - where - P: WebPushClient + Clone + Send + Sync + 'static, - { + async fn serve(self, app: App<Publisher>) -> Result<(), Failed> { let app = routes::routes(&app) .route_layer(middleware::from_fn(clock::middleware)) .route_layer(middleware::map_response(Self::server_info())) diff --git a/src/event/handlers/stream/test/vapid.rs b/src/event/handlers/stream/test/vapid.rs index dbc3929..4d7f2dd 100644 --- a/src/event/handlers/stream/test/vapid.rs +++ b/src/event/handlers/stream/test/vapid.rs @@ -7,7 +7,7 @@ use crate::test::{fixtures, fixtures::future::Expect as _}; #[tokio::test] async fn live_vapid_key_changes() { // Set up the context - let app = fixtures::scratch_app().await; + let app = fixtures::scratch_app_without_vapid().await; let resume_point = fixtures::boot::resume_point(&app).await; // Subscribe to events @@ -42,7 +42,7 @@ async fn live_vapid_key_changes() { #[tokio::test] async fn stored_vapid_key_changes() { // Set up the context - let app = fixtures::scratch_app().await; + let app = fixtures::scratch_app_without_vapid().await; let resume_point = fixtures::boot::resume_point(&app).await; // Rotate the VAPID key @@ -77,7 +77,7 @@ async fn stored_vapid_key_changes() { #[tokio::test] async fn no_past_vapid_key_changes() { // Set up the context - let app = fixtures::scratch_app().await; + let app = fixtures::scratch_app_without_vapid().await; // Rotate the VAPID key diff --git a/src/event/mod.rs b/src/event/mod.rs index 83b0ce7..cb7c969 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -29,7 +29,7 @@ pub enum Event { // above - though heartbeat events contain only a type field and none of the other event gubbins. // They don't have to participate in sequence numbering, aren't generated from stored data, and // generally Are Weird. -#[derive(serde::Serialize)] +#[derive(Eq, PartialEq, serde::Serialize)] #[serde(tag = "type", rename_all = "snake_case")] pub enum Heartbeat { Heartbeat, diff --git a/src/push/app.rs b/src/push/app.rs index 2bd6c25..ebfc220 100644 --- a/src/push/app.rs +++ b/src/push/app.rs @@ -1,29 +1,26 @@ -use futures::future::join_all; -use itertools::Itertools as _; use p256::ecdsa::VerifyingKey; use sqlx::SqlitePool; -use web_push::{ - ContentEncoding, PartialVapidSignatureBuilder, SubscriptionInfo, WebPushClient, WebPushError, - WebPushMessage, WebPushMessageBuilder, -}; +use web_push::{SubscriptionInfo, WebPushError}; use super::repo::Provider as _; use crate::{ db, error::failed::{ErrorExt as _, Failed, ResultExt as _}, + event::Heartbeat, login::Login, + push::publisher::Publish, token::extract::Identity, vapid::repo::Provider as _, }; pub struct Push<P> { db: SqlitePool, - webpush: P, + publisher: P, } impl<P> Push<P> { - pub const fn new(db: SqlitePool, webpush: P) -> Self { - Self { db, webpush } + pub const fn new(db: SqlitePool, publisher: P) -> Self { + Self { db, publisher } } pub async fn subscribe( @@ -79,28 +76,8 @@ impl<P> Push<P> { impl<P> Push<P> where - P: WebPushClient, + P: Publish, { - fn prepare_ping( - signer: &PartialVapidSignatureBuilder, - subscription: &SubscriptionInfo, - ) -> Result<WebPushMessage, PushError> { - let signature = signer - .clone() - .add_sub_info(subscription) - .build() - .fail("Failed to build VAPID signature")?; - - let payload = "ping".as_bytes(); - - let mut message = WebPushMessageBuilder::new(subscription); - message.set_payload(ContentEncoding::Aes128Gcm, payload); - message.set_vapid_signature(signature); - let message = message.build().fail("Failed to build push message")?; - - Ok(message) - } - pub async fn ping(&self, recipient: &Login) -> Result<(), PushError> { let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?; @@ -116,22 +93,24 @@ where ) })?; - let pings: Vec<_> = subscriptions - .into_iter() - .map(|sub| Self::prepare_ping(&signer, &sub).map(|message| (sub, message))) - .try_collect()?; - - let deliveries = pings - .into_iter() - .map(async |(sub, message)| (sub, self.webpush.send(message).await)); + // We're about to perform some fairly IO-intensive outside actions. Holding a tx open + // across them raises the risk that other clients will encounter errors due to locks from + // this transaction, so release it here. We'll open a new transaction if there's something + // we need to write. + tx.commit().await.fail(db::failed::COMMIT)?; - let failures: Vec<_> = join_all(deliveries) + let failures = self + .publisher + .publish(Heartbeat::Heartbeat, signer, subscriptions) .await - .into_iter() - .filter_map(|(sub, result)| result.err().map(|err| (sub, err))) - .collect(); + .fail("Failed to send push message")?; if !failures.is_empty() { + let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?; + // Note that data integrity guarantees from the original transaction to read + // subscriptions may no longer be valid now. Time has passed. Depending on how slow + // delivering push notifications is, potentially a _lot_ of time has passed. + for (sub, err) in &failures { match err { // I _think_ this is the complete set of permanent failures. See @@ -152,13 +131,12 @@ where } } + tx.commit().await.fail(db::failed::COMMIT)?; + return Err(PushError::Delivery( failures.into_iter().map(|(_, err)| err).collect(), )); } - - tx.commit().await.fail(db::failed::COMMIT)?; - Ok(()) } } diff --git a/src/push/handlers/ping/mod.rs b/src/push/handlers/ping/mod.rs index db828fa..2a86984 100644 --- a/src/push/handlers/ping/mod.rs +++ b/src/push/handlers/ping/mod.rs @@ -1,7 +1,10 @@ use axum::{Json, extract::State, http::StatusCode}; -use web_push::WebPushClient; -use crate::{error::Internal, push::app::Push, token::extract::Identity}; +use crate::{ + error::Internal, + push::{Publish, app::Push}, + token::extract::Identity, +}; #[cfg(test)] mod test; @@ -15,7 +18,7 @@ pub async fn handler<P>( Json(_): Json<Request>, ) -> Result<StatusCode, Internal> where - P: WebPushClient, + P: Publish, { push.ping(&identity.login).await?; diff --git a/src/push/handlers/ping/test.rs b/src/push/handlers/ping/test.rs index 5725131..3481139 100644 --- a/src/push/handlers/ping/test.rs +++ b/src/push/handlers/ping/test.rs @@ -2,8 +2,9 @@ use axum::{ extract::{Json, State}, http::StatusCode, }; +use itertools::Itertools; -use crate::test::fixtures; +use crate::{event::Heartbeat, test::fixtures}; #[tokio::test] async fn ping_without_subscriptions() { @@ -11,18 +12,21 @@ async fn ping_without_subscriptions() { let recipient = fixtures::identity::create(&app, &fixtures::now()).await; - app.vapid() - .refresh_key(&fixtures::now()) - .await - .expect("refreshing the VAPID key always succeeds"); - let response = super::handler(State(app.push()), recipient, Json(super::Request {})) .await .expect("sending a ping with no subscriptions always succeeds"); assert_eq!(StatusCode::ACCEPTED, response); - assert!(app.webpush().sent().is_empty()); + assert!( + app.publisher() + .sent() + .into_iter() + .filter(|publish| publish.message_eq(&Heartbeat::Heartbeat) + && publish.subscriptions.is_empty()) + .exactly_one() + .is_ok() + ); } // More complete testing requires that we figure out how to generate working p256 ECDH keys for diff --git a/src/push/handlers/subscribe/test.rs b/src/push/handlers/subscribe/test.rs index 1bc37a4..793bcef 100644 --- a/src/push/handlers/subscribe/test.rs +++ b/src/push/handlers/subscribe/test.rs @@ -3,33 +3,16 @@ use axum::{ http::StatusCode, }; -use crate::{ - push::app::SubscribeError, - test::{fixtures, fixtures::event}, -}; +use crate::{push::app::SubscribeError, test::fixtures}; #[tokio::test] async fn accepts_new_subscription() { let app = fixtures::scratch_app().await; let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - // Issue a VAPID key. - - app.vapid() - .refresh_key(&fixtures::now()) - .await - .expect("refreshing the VAPID key always succeeds"); - // Find out what that VAPID key is. - let boot = app.boot().snapshot().await.expect("boot always succeeds"); - let vapid = boot - .events - .into_iter() - .filter_map(event::vapid) - .filter_map(event::vapid::changed) - .next_back() - .expect("the application will have a vapid key after a refresh"); + let vapid = fixtures::vapid::key(&app).await; // Create a dummy subscription with that key. @@ -41,7 +24,7 @@ async fn accepts_new_subscription() { auth: String::from("test-auth-value"), }, }, - vapid: vapid.key, + vapid, }; let response = super::handler(State(app.push()), subscriber, Json(request)) .await @@ -57,23 +40,9 @@ async fn accepts_repeat_subscription() { let app = fixtures::scratch_app().await; let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - // Issue a VAPID key. - - app.vapid() - .refresh_key(&fixtures::now()) - .await - .expect("refreshing the VAPID key always succeeds"); - // Find out what that VAPID key is. - let boot = app.boot().snapshot().await.expect("boot always succeeds"); - let vapid = boot - .events - .into_iter() - .filter_map(event::vapid) - .filter_map(event::vapid::changed) - .next_back() - .expect("the application will have a vapid key after a refresh"); + let vapid = fixtures::vapid::key(&app).await; // Create a dummy subscription with that key. @@ -85,7 +54,7 @@ async fn accepts_repeat_subscription() { auth: String::from("test-auth-value"), }, }, - vapid: vapid.key, + vapid, }; let response = super::handler(State(app.push()), subscriber.clone(), Json(request.clone())) .await @@ -111,23 +80,9 @@ async fn rejects_duplicate_subscription() { let app = fixtures::scratch_app().await; let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - // Issue a VAPID key. - - app.vapid() - .refresh_key(&fixtures::now()) - .await - .expect("refreshing the VAPID key always succeeds"); - // Find out what that VAPID key is. - let boot = app.boot().snapshot().await.expect("boot always succeeds"); - let vapid = boot - .events - .into_iter() - .filter_map(event::vapid) - .filter_map(event::vapid::changed) - .next_back() - .expect("the application will have a vapid key after a refresh"); + let vapid = fixtures::vapid::key(&app).await; // Create a dummy subscription with that key. @@ -139,7 +94,7 @@ async fn rejects_duplicate_subscription() { auth: String::from("test-auth-value"), }, }, - vapid: vapid.key, + vapid, }; super::handler(State(app.push()), subscriber.clone(), Json(request)) .await @@ -155,7 +110,7 @@ async fn rejects_duplicate_subscription() { auth: String::from("different-test-auth-value"), }, }, - vapid: vapid.key, + vapid, }; let response = super::handler(State(app.push()), subscriber, Json(request)) .await @@ -170,24 +125,7 @@ async fn rejects_duplicate_subscription() { async fn rejects_stale_vapid_key() { let app = fixtures::scratch_app().await; let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - - // Issue a VAPID key. - - app.vapid() - .refresh_key(&fixtures::now()) - .await - .expect("refreshing the VAPID key always succeeds"); - - // Find out what that VAPID key is. - - let boot = app.boot().snapshot().await.expect("boot always succeeds"); - let vapid = boot - .events - .into_iter() - .filter_map(event::vapid) - .filter_map(event::vapid::changed) - .next_back() - .expect("the application will have a vapid key after a refresh"); + let stale_vapid = fixtures::vapid::key(&app).await; // Change the VAPID key. @@ -200,16 +138,7 @@ async fn rejects_stale_vapid_key() { .await .expect("refreshing the VAPID key always succeeds"); - // Find out what the new VAPID key is. - - let boot = app.boot().snapshot().await.expect("boot always succeeds"); - let fresh_vapid = boot - .events - .into_iter() - .filter_map(event::vapid) - .filter_map(event::vapid::changed) - .next_back() - .expect("the application will have a vapid key after a refresh"); + let fresh_vapid = fixtures::vapid::key(&app).await; // Create a dummy subscription with the original key. @@ -221,7 +150,7 @@ async fn rejects_stale_vapid_key() { auth: String::from("test-auth-value"), }, }, - vapid: vapid.key, + vapid: stale_vapid, }; let response = super::handler(State(app.push()), subscriber, Json(request)) .await @@ -231,6 +160,6 @@ async fn rejects_stale_vapid_key() { assert!(matches!( response, - super::Error(SubscribeError::StaleVapidKey(key)) if key == fresh_vapid.key + super::Error(SubscribeError::StaleVapidKey(key)) if key == fresh_vapid )); } diff --git a/src/push/mod.rs b/src/push/mod.rs index 1394ea4..042991f 100644 --- a/src/push/mod.rs +++ b/src/push/mod.rs @@ -1,3 +1,6 @@ pub mod app; pub mod handlers; +mod publisher; pub mod repo; + +pub use publisher::{Publish, Publisher}; diff --git a/src/push/publisher.rs b/src/push/publisher.rs new file mode 100644 index 0000000..4092724 --- /dev/null +++ b/src/push/publisher.rs @@ -0,0 +1,83 @@ +use futures::future::join_all; +use itertools::Itertools as _; +use serde::Serialize; +use web_push::{ + ContentEncoding, IsahcWebPushClient, PartialVapidSignatureBuilder, SubscriptionInfo, + WebPushClient, WebPushError, WebPushMessage, WebPushMessageBuilder, +}; + +use crate::error::failed::{Failed, ResultExt as _}; + +pub trait Publish { + fn publish<M>( + &self, + message: M, + signer: PartialVapidSignatureBuilder, + subscriptions: impl IntoIterator<Item = SubscriptionInfo> + Send, + ) -> impl Future<Output = Result<Vec<(SubscriptionInfo, WebPushError)>, Failed>> + Send + where + M: Serialize + Send + 'static; +} + +#[derive(Clone)] +pub struct Publisher { + client: IsahcWebPushClient, +} + +impl Publisher { + pub fn new() -> Result<Self, WebPushError> { + let client = IsahcWebPushClient::new()?; + Ok(Self { client }) + } + + fn prepare_message( + payload: &[u8], + signer: &PartialVapidSignatureBuilder, + subscription: &SubscriptionInfo, + ) -> Result<WebPushMessage, Failed> { + let signature = signer + .clone() + .add_sub_info(subscription) + .build() + .fail("Failed to build VAPID signature")?; + + let mut message = WebPushMessageBuilder::new(subscription); + message.set_payload(ContentEncoding::Aes128Gcm, payload); + message.set_vapid_signature(signature); + let message = message.build().fail("Failed to build push message")?; + + Ok(message) + } +} + +impl Publish for Publisher { + async fn publish<M>( + &self, + message: M, + signer: PartialVapidSignatureBuilder, + subscriptions: impl IntoIterator<Item = SubscriptionInfo> + Send, + ) -> Result<Vec<(SubscriptionInfo, WebPushError)>, Failed> + where + M: Serialize + Send + 'static, + { + let payload = serde_json::to_vec_pretty(&message) + .fail("Failed to encode web push message to JSON")?; + + let messages: Vec<_> = subscriptions + .into_iter() + .map(|sub| Self::prepare_message(&payload, &signer, &sub).map(|message| (sub, message))) + .try_collect()?; + + let deliveries = messages + .into_iter() + .map(async |(sub, message)| (sub, self.client.send(message).await)); + + let failures = join_all(deliveries) + .await + .into_iter() + .filter_map(|(sub, result)| result.err().map(|err| (sub, err))) + .collect(); + + Ok(failures) + } +} diff --git a/src/routes.rs b/src/routes.rs index 1c07e78..0bf429a 100644 --- a/src/routes.rs +++ b/src/routes.rs @@ -3,15 +3,17 @@ use axum::{ response::Redirect, routing::{delete, get, post}, }; -use web_push::WebPushClient; use crate::{ - app::App, boot, conversation, event, expire, invite, login, message, push, setup, ui, vapid, + app::App, + boot, conversation, event, expire, invite, login, message, + push::{self, Publish}, + setup, ui, vapid, }; pub fn routes<P>(app: &App<P>) -> Router<App<P>> where - P: WebPushClient + Clone + Send + Sync + 'static, + P: Publish + Clone + Sync + Send + 'static, { // UI routes that can be accessed before the administrator completes setup. let ui_bootstrap = Router::new() diff --git a/src/test/fixtures/mod.rs b/src/test/fixtures/mod.rs index 53bf31b..85935d6 100644 --- a/src/test/fixtures/mod.rs +++ b/src/test/fixtures/mod.rs @@ -12,8 +12,20 @@ pub mod invite; pub mod login; pub mod message; pub mod user; +pub mod vapid; pub async fn scratch_app() -> App<Client> { + let app = scratch_app_without_vapid().await; + + app.vapid() + .refresh_key(&now()) + .await + .expect("refreshing the VAPID key always succeeds"); + + app +} + +pub async fn scratch_app_without_vapid() -> App<Client> { let pool = db::prepare("sqlite::memory:", "sqlite::memory:") .await .expect("setting up in-memory sqlite database"); diff --git a/src/test/fixtures/vapid.rs b/src/test/fixtures/vapid.rs new file mode 100644 index 0000000..29cdf1a --- /dev/null +++ b/src/test/fixtures/vapid.rs @@ -0,0 +1,16 @@ +use p256::ecdsa::VerifyingKey; + +use crate::{app::App, test::fixtures}; + +pub async fn key<P>(app: &App<P>) -> VerifyingKey { + let boot = app.boot().snapshot().await.expect("boot always succeeds"); + let changed = boot + .events + .into_iter() + .filter_map(fixtures::event::vapid) + .filter_map(fixtures::event::vapid::changed) + .next_back() + .expect("the application has a vapid key"); + + changed.key +} diff --git a/src/test/webpush.rs b/src/test/webpush.rs index a611ad0..f33f03c 100644 --- a/src/test/webpush.rs +++ b/src/test/webpush.rs @@ -1,13 +1,16 @@ use std::{ + any::Any, mem, sync::{Arc, Mutex}, }; -use web_push::{WebPushClient, WebPushError, WebPushMessage}; +use web_push::{PartialVapidSignatureBuilder, SubscriptionInfo, WebPushError}; + +use crate::{error::failed::Failed, push::Publish}; #[derive(Clone)] pub struct Client { - sent: Arc<Mutex<Vec<WebPushMessage>>>, + sent: Arc<Mutex<Vec<Publication>>>, } impl Client { @@ -18,20 +21,48 @@ impl Client { } // Clears the list of sent messages (for all clones of this Client) when called, because we - // can't clone `WebPushMessage`s so we either need to move them or try to reconstruct them, - // either of which sucks but moving them sucks less. - pub fn sent(&self) -> Vec<WebPushMessage> { + // can't clone `Publications`s, so we either need to move them or try to reconstruct them. + pub fn sent(&self) -> Vec<Publication> { let mut sent = self.sent.lock().unwrap(); - mem::take(&mut *sent) + mem::take(&mut sent) } } -#[async_trait::async_trait] -impl WebPushClient for Client { - async fn send(&self, message: WebPushMessage) -> Result<(), WebPushError> { - let mut sent = self.sent.lock().unwrap(); - sent.push(message); +impl Publish for Client { + async fn publish<M>( + &self, + message: M, + _: PartialVapidSignatureBuilder, + subscriptions: impl IntoIterator<Item = SubscriptionInfo> + Send, + ) -> Result<Vec<(SubscriptionInfo, WebPushError)>, Failed> + where + M: Send + 'static, + { + let message: Box<dyn Any + Send> = Box::new(message); + let subscriptions = subscriptions.into_iter().collect(); + let publication = Publication { + message, + subscriptions, + }; + self.sent.lock().unwrap().push(publication); + + Ok(Vec::new()) + } +} + +pub struct Publication { + pub message: Box<dyn Any + Send>, + pub subscriptions: Vec<SubscriptionInfo>, +} - Ok(()) +impl Publication { + pub fn message_eq<M>(&self, candidate: &M) -> bool + where + M: PartialEq + 'static, + { + match self.message.downcast_ref::<M>() { + None => false, + Some(message) => message == candidate, + } } } |
