summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/app.rs18
-rw-r--r--src/boot/handlers/boot/test.rs14
-rw-r--r--src/cli.rs11
-rw-r--r--src/event/handlers/stream/test/vapid.rs6
-rw-r--r--src/event/mod.rs2
-rw-r--r--src/push/app.rs68
-rw-r--r--src/push/handlers/ping/mod.rs9
-rw-r--r--src/push/handlers/ping/test.rs18
-rw-r--r--src/push/handlers/subscribe/test.rs95
-rw-r--r--src/push/mod.rs3
-rw-r--r--src/push/publisher.rs83
-rw-r--r--src/routes.rs8
-rw-r--r--src/test/fixtures/mod.rs12
-rw-r--r--src/test/fixtures/vapid.rs16
-rw-r--r--src/test/webpush.rs55
15 files changed, 235 insertions, 183 deletions
diff --git a/src/app.rs b/src/app.rs
index 098ae9f..6261d34 100644
--- a/src/app.rs
+++ b/src/app.rs
@@ -19,18 +19,18 @@ use crate::{
#[derive(Clone)]
pub struct App<P> {
db: SqlitePool,
- webpush: P,
+ publisher: P,
events: event::Broadcaster,
token_events: token::Broadcaster,
}
impl<P> App<P> {
- pub fn from(db: SqlitePool, webpush: P) -> Self {
+ pub fn from(db: SqlitePool, publisher: P) -> Self {
let events = event::Broadcaster::default();
let token_events = token::Broadcaster::default();
Self {
db,
- webpush,
+ publisher,
events,
token_events,
}
@@ -62,11 +62,16 @@ impl<P> App<P> {
Messages::new(self.db.clone(), self.events.clone())
}
+ #[cfg(test)]
+ pub fn publisher(&self) -> &P {
+ &self.publisher
+ }
+
pub fn push(&self) -> Push<P>
where
P: Clone,
{
- Push::new(self.db.clone(), self.webpush.clone())
+ Push::new(self.db.clone(), self.publisher.clone())
}
pub fn setup(&self) -> Setup {
@@ -85,11 +90,6 @@ impl<P> App<P> {
pub fn vapid(&self) -> Vapid {
Vapid::new(self.db.clone(), self.events.clone())
}
-
- #[cfg(test)]
- pub fn webpush(&self) -> &P {
- &self.webpush
- }
}
impl<P> FromRef<App<P>> for Boot {
diff --git a/src/boot/handlers/boot/test.rs b/src/boot/handlers/boot/test.rs
index f192478..0aef694 100644
--- a/src/boot/handlers/boot/test.rs
+++ b/src/boot/handlers/boot/test.rs
@@ -84,11 +84,6 @@ async fn includes_messages() {
async fn includes_vapid_key() {
let app = fixtures::scratch_app().await;
- app.vapid()
- .refresh_key(&fixtures::now())
- .await
- .expect("key rotation always succeeds");
-
let viewer = fixtures::identity::fictitious();
let response = super::handler(State(app.boot()), viewer)
.await
@@ -108,11 +103,6 @@ async fn includes_vapid_key() {
async fn includes_only_latest_vapid_key() {
let app = fixtures::scratch_app().await;
- app.vapid()
- .refresh_key(&fixtures::ancient())
- .await
- .expect("key rotation always succeeds");
-
let viewer = fixtures::identity::fictitious();
let response = super::handler(State(app.boot()), viewer.clone())
.await
@@ -128,6 +118,10 @@ async fn includes_only_latest_vapid_key() {
.expect("only one vapid key has been created");
app.vapid()
+ .revoke_key()
+ .await
+ .expect("key revocation always succeeds");
+ app.vapid()
.refresh_key(&fixtures::now())
.await
.expect("key rotation always succeeds");
diff --git a/src/cli.rs b/src/cli.rs
index 36f3d31..971d1f9 100644
--- a/src/cli.rs
+++ b/src/cli.rs
@@ -13,13 +13,13 @@ use axum::{
use clap::{CommandFactory, Parser, Subcommand};
use sqlx::sqlite::SqlitePool;
use tokio::net;
-use web_push::{IsahcWebPushClient, WebPushClient};
pub use crate::exit::Exit;
use crate::{
app::App,
clock, db,
error::failed::{Failed, ResultExt as _},
+ push::Publisher,
routes,
umask::Umask,
};
@@ -101,8 +101,8 @@ impl Args {
self.umask.set();
let pool = self.pool().await.fail("Failed to create database pool")?;
- let webpush = IsahcWebPushClient::new().fail("Failed to create web push publisher")?;
- let app = App::from(pool, webpush);
+ let publisher = Publisher::new().fail("Failed to create web push publisher")?;
+ let app = App::from(pool, publisher);
match self.command {
None => self.serve(app).await?,
@@ -116,10 +116,7 @@ impl Args {
Result::<_, Failed>::Ok(())
}
- async fn serve<P>(self, app: App<P>) -> Result<(), Failed>
- where
- P: WebPushClient + Clone + Send + Sync + 'static,
- {
+ async fn serve(self, app: App<Publisher>) -> Result<(), Failed> {
let app = routes::routes(&app)
.route_layer(middleware::from_fn(clock::middleware))
.route_layer(middleware::map_response(Self::server_info()))
diff --git a/src/event/handlers/stream/test/vapid.rs b/src/event/handlers/stream/test/vapid.rs
index dbc3929..4d7f2dd 100644
--- a/src/event/handlers/stream/test/vapid.rs
+++ b/src/event/handlers/stream/test/vapid.rs
@@ -7,7 +7,7 @@ use crate::test::{fixtures, fixtures::future::Expect as _};
#[tokio::test]
async fn live_vapid_key_changes() {
// Set up the context
- let app = fixtures::scratch_app().await;
+ let app = fixtures::scratch_app_without_vapid().await;
let resume_point = fixtures::boot::resume_point(&app).await;
// Subscribe to events
@@ -42,7 +42,7 @@ async fn live_vapid_key_changes() {
#[tokio::test]
async fn stored_vapid_key_changes() {
// Set up the context
- let app = fixtures::scratch_app().await;
+ let app = fixtures::scratch_app_without_vapid().await;
let resume_point = fixtures::boot::resume_point(&app).await;
// Rotate the VAPID key
@@ -77,7 +77,7 @@ async fn stored_vapid_key_changes() {
#[tokio::test]
async fn no_past_vapid_key_changes() {
// Set up the context
- let app = fixtures::scratch_app().await;
+ let app = fixtures::scratch_app_without_vapid().await;
// Rotate the VAPID key
diff --git a/src/event/mod.rs b/src/event/mod.rs
index 83b0ce7..cb7c969 100644
--- a/src/event/mod.rs
+++ b/src/event/mod.rs
@@ -29,7 +29,7 @@ pub enum Event {
// above - though heartbeat events contain only a type field and none of the other event gubbins.
// They don't have to participate in sequence numbering, aren't generated from stored data, and
// generally Are Weird.
-#[derive(serde::Serialize)]
+#[derive(Eq, PartialEq, serde::Serialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum Heartbeat {
Heartbeat,
diff --git a/src/push/app.rs b/src/push/app.rs
index 2bd6c25..ebfc220 100644
--- a/src/push/app.rs
+++ b/src/push/app.rs
@@ -1,29 +1,26 @@
-use futures::future::join_all;
-use itertools::Itertools as _;
use p256::ecdsa::VerifyingKey;
use sqlx::SqlitePool;
-use web_push::{
- ContentEncoding, PartialVapidSignatureBuilder, SubscriptionInfo, WebPushClient, WebPushError,
- WebPushMessage, WebPushMessageBuilder,
-};
+use web_push::{SubscriptionInfo, WebPushError};
use super::repo::Provider as _;
use crate::{
db,
error::failed::{ErrorExt as _, Failed, ResultExt as _},
+ event::Heartbeat,
login::Login,
+ push::publisher::Publish,
token::extract::Identity,
vapid::repo::Provider as _,
};
pub struct Push<P> {
db: SqlitePool,
- webpush: P,
+ publisher: P,
}
impl<P> Push<P> {
- pub const fn new(db: SqlitePool, webpush: P) -> Self {
- Self { db, webpush }
+ pub const fn new(db: SqlitePool, publisher: P) -> Self {
+ Self { db, publisher }
}
pub async fn subscribe(
@@ -79,28 +76,8 @@ impl<P> Push<P> {
impl<P> Push<P>
where
- P: WebPushClient,
+ P: Publish,
{
- fn prepare_ping(
- signer: &PartialVapidSignatureBuilder,
- subscription: &SubscriptionInfo,
- ) -> Result<WebPushMessage, PushError> {
- let signature = signer
- .clone()
- .add_sub_info(subscription)
- .build()
- .fail("Failed to build VAPID signature")?;
-
- let payload = "ping".as_bytes();
-
- let mut message = WebPushMessageBuilder::new(subscription);
- message.set_payload(ContentEncoding::Aes128Gcm, payload);
- message.set_vapid_signature(signature);
- let message = message.build().fail("Failed to build push message")?;
-
- Ok(message)
- }
-
pub async fn ping(&self, recipient: &Login) -> Result<(), PushError> {
let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?;
@@ -116,22 +93,24 @@ where
)
})?;
- let pings: Vec<_> = subscriptions
- .into_iter()
- .map(|sub| Self::prepare_ping(&signer, &sub).map(|message| (sub, message)))
- .try_collect()?;
-
- let deliveries = pings
- .into_iter()
- .map(async |(sub, message)| (sub, self.webpush.send(message).await));
+ // We're about to perform some fairly IO-intensive outside actions. Holding a tx open
+ // across them raises the risk that other clients will encounter errors due to locks from
+ // this transaction, so release it here. We'll open a new transaction if there's something
+ // we need to write.
+ tx.commit().await.fail(db::failed::COMMIT)?;
- let failures: Vec<_> = join_all(deliveries)
+ let failures = self
+ .publisher
+ .publish(Heartbeat::Heartbeat, signer, subscriptions)
.await
- .into_iter()
- .filter_map(|(sub, result)| result.err().map(|err| (sub, err)))
- .collect();
+ .fail("Failed to send push message")?;
if !failures.is_empty() {
+ let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?;
+ // Note that data integrity guarantees from the original transaction to read
+ // subscriptions may no longer be valid now. Time has passed. Depending on how slow
+ // delivering push notifications is, potentially a _lot_ of time has passed.
+
for (sub, err) in &failures {
match err {
// I _think_ this is the complete set of permanent failures. See
@@ -152,13 +131,12 @@ where
}
}
+ tx.commit().await.fail(db::failed::COMMIT)?;
+
return Err(PushError::Delivery(
failures.into_iter().map(|(_, err)| err).collect(),
));
}
-
- tx.commit().await.fail(db::failed::COMMIT)?;
-
Ok(())
}
}
diff --git a/src/push/handlers/ping/mod.rs b/src/push/handlers/ping/mod.rs
index db828fa..2a86984 100644
--- a/src/push/handlers/ping/mod.rs
+++ b/src/push/handlers/ping/mod.rs
@@ -1,7 +1,10 @@
use axum::{Json, extract::State, http::StatusCode};
-use web_push::WebPushClient;
-use crate::{error::Internal, push::app::Push, token::extract::Identity};
+use crate::{
+ error::Internal,
+ push::{Publish, app::Push},
+ token::extract::Identity,
+};
#[cfg(test)]
mod test;
@@ -15,7 +18,7 @@ pub async fn handler<P>(
Json(_): Json<Request>,
) -> Result<StatusCode, Internal>
where
- P: WebPushClient,
+ P: Publish,
{
push.ping(&identity.login).await?;
diff --git a/src/push/handlers/ping/test.rs b/src/push/handlers/ping/test.rs
index 5725131..3481139 100644
--- a/src/push/handlers/ping/test.rs
+++ b/src/push/handlers/ping/test.rs
@@ -2,8 +2,9 @@ use axum::{
extract::{Json, State},
http::StatusCode,
};
+use itertools::Itertools;
-use crate::test::fixtures;
+use crate::{event::Heartbeat, test::fixtures};
#[tokio::test]
async fn ping_without_subscriptions() {
@@ -11,18 +12,21 @@ async fn ping_without_subscriptions() {
let recipient = fixtures::identity::create(&app, &fixtures::now()).await;
- app.vapid()
- .refresh_key(&fixtures::now())
- .await
- .expect("refreshing the VAPID key always succeeds");
-
let response = super::handler(State(app.push()), recipient, Json(super::Request {}))
.await
.expect("sending a ping with no subscriptions always succeeds");
assert_eq!(StatusCode::ACCEPTED, response);
- assert!(app.webpush().sent().is_empty());
+ assert!(
+ app.publisher()
+ .sent()
+ .into_iter()
+ .filter(|publish| publish.message_eq(&Heartbeat::Heartbeat)
+ && publish.subscriptions.is_empty())
+ .exactly_one()
+ .is_ok()
+ );
}
// More complete testing requires that we figure out how to generate working p256 ECDH keys for
diff --git a/src/push/handlers/subscribe/test.rs b/src/push/handlers/subscribe/test.rs
index 1bc37a4..793bcef 100644
--- a/src/push/handlers/subscribe/test.rs
+++ b/src/push/handlers/subscribe/test.rs
@@ -3,33 +3,16 @@ use axum::{
http::StatusCode,
};
-use crate::{
- push::app::SubscribeError,
- test::{fixtures, fixtures::event},
-};
+use crate::{push::app::SubscribeError, test::fixtures};
#[tokio::test]
async fn accepts_new_subscription() {
let app = fixtures::scratch_app().await;
let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- // Issue a VAPID key.
-
- app.vapid()
- .refresh_key(&fixtures::now())
- .await
- .expect("refreshing the VAPID key always succeeds");
-
// Find out what that VAPID key is.
- let boot = app.boot().snapshot().await.expect("boot always succeeds");
- let vapid = boot
- .events
- .into_iter()
- .filter_map(event::vapid)
- .filter_map(event::vapid::changed)
- .next_back()
- .expect("the application will have a vapid key after a refresh");
+ let vapid = fixtures::vapid::key(&app).await;
// Create a dummy subscription with that key.
@@ -41,7 +24,7 @@ async fn accepts_new_subscription() {
auth: String::from("test-auth-value"),
},
},
- vapid: vapid.key,
+ vapid,
};
let response = super::handler(State(app.push()), subscriber, Json(request))
.await
@@ -57,23 +40,9 @@ async fn accepts_repeat_subscription() {
let app = fixtures::scratch_app().await;
let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- // Issue a VAPID key.
-
- app.vapid()
- .refresh_key(&fixtures::now())
- .await
- .expect("refreshing the VAPID key always succeeds");
-
// Find out what that VAPID key is.
- let boot = app.boot().snapshot().await.expect("boot always succeeds");
- let vapid = boot
- .events
- .into_iter()
- .filter_map(event::vapid)
- .filter_map(event::vapid::changed)
- .next_back()
- .expect("the application will have a vapid key after a refresh");
+ let vapid = fixtures::vapid::key(&app).await;
// Create a dummy subscription with that key.
@@ -85,7 +54,7 @@ async fn accepts_repeat_subscription() {
auth: String::from("test-auth-value"),
},
},
- vapid: vapid.key,
+ vapid,
};
let response = super::handler(State(app.push()), subscriber.clone(), Json(request.clone()))
.await
@@ -111,23 +80,9 @@ async fn rejects_duplicate_subscription() {
let app = fixtures::scratch_app().await;
let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
- // Issue a VAPID key.
-
- app.vapid()
- .refresh_key(&fixtures::now())
- .await
- .expect("refreshing the VAPID key always succeeds");
-
// Find out what that VAPID key is.
- let boot = app.boot().snapshot().await.expect("boot always succeeds");
- let vapid = boot
- .events
- .into_iter()
- .filter_map(event::vapid)
- .filter_map(event::vapid::changed)
- .next_back()
- .expect("the application will have a vapid key after a refresh");
+ let vapid = fixtures::vapid::key(&app).await;
// Create a dummy subscription with that key.
@@ -139,7 +94,7 @@ async fn rejects_duplicate_subscription() {
auth: String::from("test-auth-value"),
},
},
- vapid: vapid.key,
+ vapid,
};
super::handler(State(app.push()), subscriber.clone(), Json(request))
.await
@@ -155,7 +110,7 @@ async fn rejects_duplicate_subscription() {
auth: String::from("different-test-auth-value"),
},
},
- vapid: vapid.key,
+ vapid,
};
let response = super::handler(State(app.push()), subscriber, Json(request))
.await
@@ -170,24 +125,7 @@ async fn rejects_duplicate_subscription() {
async fn rejects_stale_vapid_key() {
let app = fixtures::scratch_app().await;
let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
-
- // Issue a VAPID key.
-
- app.vapid()
- .refresh_key(&fixtures::now())
- .await
- .expect("refreshing the VAPID key always succeeds");
-
- // Find out what that VAPID key is.
-
- let boot = app.boot().snapshot().await.expect("boot always succeeds");
- let vapid = boot
- .events
- .into_iter()
- .filter_map(event::vapid)
- .filter_map(event::vapid::changed)
- .next_back()
- .expect("the application will have a vapid key after a refresh");
+ let stale_vapid = fixtures::vapid::key(&app).await;
// Change the VAPID key.
@@ -200,16 +138,7 @@ async fn rejects_stale_vapid_key() {
.await
.expect("refreshing the VAPID key always succeeds");
- // Find out what the new VAPID key is.
-
- let boot = app.boot().snapshot().await.expect("boot always succeeds");
- let fresh_vapid = boot
- .events
- .into_iter()
- .filter_map(event::vapid)
- .filter_map(event::vapid::changed)
- .next_back()
- .expect("the application will have a vapid key after a refresh");
+ let fresh_vapid = fixtures::vapid::key(&app).await;
// Create a dummy subscription with the original key.
@@ -221,7 +150,7 @@ async fn rejects_stale_vapid_key() {
auth: String::from("test-auth-value"),
},
},
- vapid: vapid.key,
+ vapid: stale_vapid,
};
let response = super::handler(State(app.push()), subscriber, Json(request))
.await
@@ -231,6 +160,6 @@ async fn rejects_stale_vapid_key() {
assert!(matches!(
response,
- super::Error(SubscribeError::StaleVapidKey(key)) if key == fresh_vapid.key
+ super::Error(SubscribeError::StaleVapidKey(key)) if key == fresh_vapid
));
}
diff --git a/src/push/mod.rs b/src/push/mod.rs
index 1394ea4..042991f 100644
--- a/src/push/mod.rs
+++ b/src/push/mod.rs
@@ -1,3 +1,6 @@
pub mod app;
pub mod handlers;
+mod publisher;
pub mod repo;
+
+pub use publisher::{Publish, Publisher};
diff --git a/src/push/publisher.rs b/src/push/publisher.rs
new file mode 100644
index 0000000..4092724
--- /dev/null
+++ b/src/push/publisher.rs
@@ -0,0 +1,83 @@
+use futures::future::join_all;
+use itertools::Itertools as _;
+use serde::Serialize;
+use web_push::{
+ ContentEncoding, IsahcWebPushClient, PartialVapidSignatureBuilder, SubscriptionInfo,
+ WebPushClient, WebPushError, WebPushMessage, WebPushMessageBuilder,
+};
+
+use crate::error::failed::{Failed, ResultExt as _};
+
+pub trait Publish {
+ fn publish<M>(
+ &self,
+ message: M,
+ signer: PartialVapidSignatureBuilder,
+ subscriptions: impl IntoIterator<Item = SubscriptionInfo> + Send,
+ ) -> impl Future<Output = Result<Vec<(SubscriptionInfo, WebPushError)>, Failed>> + Send
+ where
+ M: Serialize + Send + 'static;
+}
+
+#[derive(Clone)]
+pub struct Publisher {
+ client: IsahcWebPushClient,
+}
+
+impl Publisher {
+ pub fn new() -> Result<Self, WebPushError> {
+ let client = IsahcWebPushClient::new()?;
+ Ok(Self { client })
+ }
+
+ fn prepare_message(
+ payload: &[u8],
+ signer: &PartialVapidSignatureBuilder,
+ subscription: &SubscriptionInfo,
+ ) -> Result<WebPushMessage, Failed> {
+ let signature = signer
+ .clone()
+ .add_sub_info(subscription)
+ .build()
+ .fail("Failed to build VAPID signature")?;
+
+ let mut message = WebPushMessageBuilder::new(subscription);
+ message.set_payload(ContentEncoding::Aes128Gcm, payload);
+ message.set_vapid_signature(signature);
+ let message = message.build().fail("Failed to build push message")?;
+
+ Ok(message)
+ }
+}
+
+impl Publish for Publisher {
+ async fn publish<M>(
+ &self,
+ message: M,
+ signer: PartialVapidSignatureBuilder,
+ subscriptions: impl IntoIterator<Item = SubscriptionInfo> + Send,
+ ) -> Result<Vec<(SubscriptionInfo, WebPushError)>, Failed>
+ where
+ M: Serialize + Send + 'static,
+ {
+ let payload = serde_json::to_vec_pretty(&message)
+ .fail("Failed to encode web push message to JSON")?;
+
+ let messages: Vec<_> = subscriptions
+ .into_iter()
+ .map(|sub| Self::prepare_message(&payload, &signer, &sub).map(|message| (sub, message)))
+ .try_collect()?;
+
+ let deliveries = messages
+ .into_iter()
+ .map(async |(sub, message)| (sub, self.client.send(message).await));
+
+ let failures = join_all(deliveries)
+ .await
+ .into_iter()
+ .filter_map(|(sub, result)| result.err().map(|err| (sub, err)))
+ .collect();
+
+ Ok(failures)
+ }
+}
diff --git a/src/routes.rs b/src/routes.rs
index 1c07e78..0bf429a 100644
--- a/src/routes.rs
+++ b/src/routes.rs
@@ -3,15 +3,17 @@ use axum::{
response::Redirect,
routing::{delete, get, post},
};
-use web_push::WebPushClient;
use crate::{
- app::App, boot, conversation, event, expire, invite, login, message, push, setup, ui, vapid,
+ app::App,
+ boot, conversation, event, expire, invite, login, message,
+ push::{self, Publish},
+ setup, ui, vapid,
};
pub fn routes<P>(app: &App<P>) -> Router<App<P>>
where
- P: WebPushClient + Clone + Send + Sync + 'static,
+ P: Publish + Clone + Sync + Send + 'static,
{
// UI routes that can be accessed before the administrator completes setup.
let ui_bootstrap = Router::new()
diff --git a/src/test/fixtures/mod.rs b/src/test/fixtures/mod.rs
index 53bf31b..85935d6 100644
--- a/src/test/fixtures/mod.rs
+++ b/src/test/fixtures/mod.rs
@@ -12,8 +12,20 @@ pub mod invite;
pub mod login;
pub mod message;
pub mod user;
+pub mod vapid;
pub async fn scratch_app() -> App<Client> {
+ let app = scratch_app_without_vapid().await;
+
+ app.vapid()
+ .refresh_key(&now())
+ .await
+ .expect("refreshing the VAPID key always succeeds");
+
+ app
+}
+
+pub async fn scratch_app_without_vapid() -> App<Client> {
let pool = db::prepare("sqlite::memory:", "sqlite::memory:")
.await
.expect("setting up in-memory sqlite database");
diff --git a/src/test/fixtures/vapid.rs b/src/test/fixtures/vapid.rs
new file mode 100644
index 0000000..29cdf1a
--- /dev/null
+++ b/src/test/fixtures/vapid.rs
@@ -0,0 +1,16 @@
+use p256::ecdsa::VerifyingKey;
+
+use crate::{app::App, test::fixtures};
+
+pub async fn key<P>(app: &App<P>) -> VerifyingKey {
+ let boot = app.boot().snapshot().await.expect("boot always succeeds");
+ let changed = boot
+ .events
+ .into_iter()
+ .filter_map(fixtures::event::vapid)
+ .filter_map(fixtures::event::vapid::changed)
+ .next_back()
+ .expect("the application has a vapid key");
+
+ changed.key
+}
diff --git a/src/test/webpush.rs b/src/test/webpush.rs
index a611ad0..f33f03c 100644
--- a/src/test/webpush.rs
+++ b/src/test/webpush.rs
@@ -1,13 +1,16 @@
use std::{
+ any::Any,
mem,
sync::{Arc, Mutex},
};
-use web_push::{WebPushClient, WebPushError, WebPushMessage};
+use web_push::{PartialVapidSignatureBuilder, SubscriptionInfo, WebPushError};
+
+use crate::{error::failed::Failed, push::Publish};
#[derive(Clone)]
pub struct Client {
- sent: Arc<Mutex<Vec<WebPushMessage>>>,
+ sent: Arc<Mutex<Vec<Publication>>>,
}
impl Client {
@@ -18,20 +21,48 @@ impl Client {
}
// Clears the list of sent messages (for all clones of this Client) when called, because we
- // can't clone `WebPushMessage`s so we either need to move them or try to reconstruct them,
- // either of which sucks but moving them sucks less.
- pub fn sent(&self) -> Vec<WebPushMessage> {
+ // can't clone `Publications`s, so we either need to move them or try to reconstruct them.
+ pub fn sent(&self) -> Vec<Publication> {
let mut sent = self.sent.lock().unwrap();
- mem::take(&mut *sent)
+ mem::take(&mut sent)
}
}
-#[async_trait::async_trait]
-impl WebPushClient for Client {
- async fn send(&self, message: WebPushMessage) -> Result<(), WebPushError> {
- let mut sent = self.sent.lock().unwrap();
- sent.push(message);
+impl Publish for Client {
+ async fn publish<M>(
+ &self,
+ message: M,
+ _: PartialVapidSignatureBuilder,
+ subscriptions: impl IntoIterator<Item = SubscriptionInfo> + Send,
+ ) -> Result<Vec<(SubscriptionInfo, WebPushError)>, Failed>
+ where
+ M: Send + 'static,
+ {
+ let message: Box<dyn Any + Send> = Box::new(message);
+ let subscriptions = subscriptions.into_iter().collect();
+ let publication = Publication {
+ message,
+ subscriptions,
+ };
+ self.sent.lock().unwrap().push(publication);
+
+ Ok(Vec::new())
+ }
+}
+
+pub struct Publication {
+ pub message: Box<dyn Any + Send>,
+ pub subscriptions: Vec<SubscriptionInfo>,
+}
- Ok(())
+impl Publication {
+ pub fn message_eq<M>(&self, candidate: &M) -> bool
+ where
+ M: PartialEq + 'static,
+ {
+ match self.message.downcast_ref::<M>() {
+ None => false,
+ Some(message) => message == candidate,
+ }
}
}