diff options
Diffstat (limited to 'src/push')
| -rw-r--r-- | src/push/app.rs | 2 | ||||
| -rw-r--r-- | src/push/handlers/ping/test.rs | 24 | ||||
| -rw-r--r-- | src/push/publisher.rs | 20 | ||||
| -rw-r--r-- | src/push/repo.rs | 21 |
4 files changed, 45 insertions, 22 deletions
diff --git a/src/push/app.rs b/src/push/app.rs index ebfc220..f7846a6 100644 --- a/src/push/app.rs +++ b/src/push/app.rs @@ -101,7 +101,7 @@ where let failures = self .publisher - .publish(Heartbeat::Heartbeat, signer, subscriptions) + .publish(Heartbeat::Heartbeat, &signer, &subscriptions) .await .fail("Failed to send push message")?; diff --git a/src/push/handlers/ping/test.rs b/src/push/handlers/ping/test.rs index c985aaf..cc07ef0 100644 --- a/src/push/handlers/ping/test.rs +++ b/src/push/handlers/ping/test.rs @@ -26,7 +26,7 @@ async fn ping_without_subscriptions() { .sent() .into_iter() .filter(|publish| publish.message_eq(&Heartbeat::Heartbeat) - && publish.subscriptions.is_empty()) + && publish.recipients.is_empty()) .exactly_one() .is_ok() ); @@ -64,7 +64,7 @@ async fn ping() { .sent() .into_iter() .filter(|publish| publish.message_eq(&Heartbeat::Heartbeat) - && publish.subscriptions == subscriptions) + && publish.recipients == subscriptions) .exactly_one() .is_ok() ); @@ -110,7 +110,7 @@ async fn ping_multiple_subscriptions() { .sent() .into_iter() .filter(|publish| publish.message_eq(&Heartbeat::Heartbeat) - && publish.subscriptions == subscriptions) + && publish.recipients == subscriptions) .exactly_one() .is_ok() ); @@ -160,7 +160,7 @@ async fn ping_recipient_only() { assert!( sent.iter() .filter(|publish| publish.message_eq(&Heartbeat::Heartbeat) - && publish.subscriptions == recipient_subscriptions) + && publish.recipients == recipient_subscriptions) .exactly_one() .is_ok() ); @@ -169,7 +169,7 @@ async fn ping_recipient_only() { assert!( !sent .iter() - .any(|publish| publish.subscriptions.contains(&spectator_subscription)) + .any(|publish| publish.recipients.contains(&spectator_subscription)) ); } @@ -212,7 +212,7 @@ async fn ping_permanent_error() { assert!( sent.iter() .filter(|publish| publish.message_eq(&Heartbeat::Heartbeat) - && publish.subscriptions == subscriptions) + && publish.recipients == subscriptions) .exactly_one() .is_ok() ); @@ -230,7 +230,7 @@ async fn ping_permanent_error() { assert!( !sent .iter() - .any(|publish| publish.subscriptions.contains(&subscription)) + .any(|publish| publish.recipients.contains(&subscription)) ); } @@ -275,7 +275,7 @@ async fn ping_temporary_error() { assert!( sent.iter() .filter(|publish| publish.message_eq(&Heartbeat::Heartbeat) - && publish.subscriptions == subscriptions) + && publish.recipients == subscriptions) .exactly_one() .is_ok() ); @@ -293,7 +293,7 @@ async fn ping_temporary_error() { assert!( sent.iter() .filter(|publish| publish.message_eq(&Heartbeat::Heartbeat) - && publish.subscriptions == subscriptions) + && publish.recipients == subscriptions) .exactly_one() .is_ok() ); @@ -345,7 +345,7 @@ async fn ping_multiple_subscriptions_with_failure() { .sent() .iter() .filter(|publish| publish.message_eq(&Heartbeat::Heartbeat) - && publish.subscriptions == subscriptions) + && publish.recipients == subscriptions) .exactly_one() .is_ok() ); @@ -362,13 +362,13 @@ async fn ping_multiple_subscriptions_with_failure() { assert!( sent.iter() .filter(|publish| publish.message_eq(&Heartbeat::Heartbeat) - && publish.subscriptions == subscriptions) + && publish.recipients == subscriptions) .exactly_one() .is_ok() ); assert!( !sent .iter() - .any(|publish| publish.subscriptions.contains(&failing)) + .any(|publish| publish.recipients.contains(&failing)) ); } diff --git a/src/push/publisher.rs b/src/push/publisher.rs index 4092724..d6227a2 100644 --- a/src/push/publisher.rs +++ b/src/push/publisher.rs @@ -8,13 +8,14 @@ use web_push::{ use crate::error::failed::{Failed, ResultExt as _}; +#[async_trait::async_trait] pub trait Publish { - fn publish<M>( + async fn publish<'s, M>( &self, message: M, - signer: PartialVapidSignatureBuilder, - subscriptions: impl IntoIterator<Item = SubscriptionInfo> + Send, - ) -> impl Future<Output = Result<Vec<(SubscriptionInfo, WebPushError)>, Failed>> + Send + signer: &PartialVapidSignatureBuilder, + subscriptions: impl IntoIterator<Item = &'s SubscriptionInfo> + Send, + ) -> Result<Vec<(&'s SubscriptionInfo, WebPushError)>, Failed> where M: Serialize + Send + 'static; } @@ -50,13 +51,14 @@ impl Publisher { } } +#[async_trait::async_trait] impl Publish for Publisher { - async fn publish<M>( + async fn publish<'s, M>( &self, message: M, - signer: PartialVapidSignatureBuilder, - subscriptions: impl IntoIterator<Item = SubscriptionInfo> + Send, - ) -> Result<Vec<(SubscriptionInfo, WebPushError)>, Failed> + signer: &PartialVapidSignatureBuilder, + subscriptions: impl IntoIterator<Item = &'s SubscriptionInfo> + Send, + ) -> Result<Vec<(&'s SubscriptionInfo, WebPushError)>, Failed> where M: Serialize + Send + 'static, { @@ -65,7 +67,7 @@ impl Publish for Publisher { let messages: Vec<_> = subscriptions .into_iter() - .map(|sub| Self::prepare_message(&payload, &signer, &sub).map(|message| (sub, message))) + .map(|sub| Self::prepare_message(&payload, signer, sub).map(|message| (sub, message))) .try_collect()?; let deliveries = messages diff --git a/src/push/repo.rs b/src/push/repo.rs index 4183489..8850059 100644 --- a/src/push/repo.rs +++ b/src/push/repo.rs @@ -83,6 +83,27 @@ impl Push<'_> { Ok(info) } + pub async fn broadcast_from( + &mut self, + originator: &Login, + ) -> Result<Vec<SubscriptionInfo>, sqlx::Error> { + sqlx::query!( + r#" + select + sub.endpoint, + sub.p256dh, + sub.auth + from push_subscription as sub + join token on sub.token = token.id + where token.login <> $1 + "#, + originator.id, + ) + .map(|row| SubscriptionInfo::new(row.endpoint, row.p256dh, row.auth)) + .fetch_all(&mut *self.0) + .await + } + pub async fn unsubscribe( &mut self, subscription: &SubscriptionInfo, |
