summaryrefslogtreecommitdiff
path: root/src/push
diff options
context:
space:
mode:
Diffstat (limited to 'src/push')
-rw-r--r--src/push/app.rs2
-rw-r--r--src/push/handlers/ping/test.rs24
-rw-r--r--src/push/publisher.rs20
-rw-r--r--src/push/repo.rs21
4 files changed, 45 insertions, 22 deletions
diff --git a/src/push/app.rs b/src/push/app.rs
index ebfc220..f7846a6 100644
--- a/src/push/app.rs
+++ b/src/push/app.rs
@@ -101,7 +101,7 @@ where
let failures = self
.publisher
- .publish(Heartbeat::Heartbeat, signer, subscriptions)
+ .publish(Heartbeat::Heartbeat, &signer, &subscriptions)
.await
.fail("Failed to send push message")?;
diff --git a/src/push/handlers/ping/test.rs b/src/push/handlers/ping/test.rs
index c985aaf..cc07ef0 100644
--- a/src/push/handlers/ping/test.rs
+++ b/src/push/handlers/ping/test.rs
@@ -26,7 +26,7 @@ async fn ping_without_subscriptions() {
.sent()
.into_iter()
.filter(|publish| publish.message_eq(&Heartbeat::Heartbeat)
- && publish.subscriptions.is_empty())
+ && publish.recipients.is_empty())
.exactly_one()
.is_ok()
);
@@ -64,7 +64,7 @@ async fn ping() {
.sent()
.into_iter()
.filter(|publish| publish.message_eq(&Heartbeat::Heartbeat)
- && publish.subscriptions == subscriptions)
+ && publish.recipients == subscriptions)
.exactly_one()
.is_ok()
);
@@ -110,7 +110,7 @@ async fn ping_multiple_subscriptions() {
.sent()
.into_iter()
.filter(|publish| publish.message_eq(&Heartbeat::Heartbeat)
- && publish.subscriptions == subscriptions)
+ && publish.recipients == subscriptions)
.exactly_one()
.is_ok()
);
@@ -160,7 +160,7 @@ async fn ping_recipient_only() {
assert!(
sent.iter()
.filter(|publish| publish.message_eq(&Heartbeat::Heartbeat)
- && publish.subscriptions == recipient_subscriptions)
+ && publish.recipients == recipient_subscriptions)
.exactly_one()
.is_ok()
);
@@ -169,7 +169,7 @@ async fn ping_recipient_only() {
assert!(
!sent
.iter()
- .any(|publish| publish.subscriptions.contains(&spectator_subscription))
+ .any(|publish| publish.recipients.contains(&spectator_subscription))
);
}
@@ -212,7 +212,7 @@ async fn ping_permanent_error() {
assert!(
sent.iter()
.filter(|publish| publish.message_eq(&Heartbeat::Heartbeat)
- && publish.subscriptions == subscriptions)
+ && publish.recipients == subscriptions)
.exactly_one()
.is_ok()
);
@@ -230,7 +230,7 @@ async fn ping_permanent_error() {
assert!(
!sent
.iter()
- .any(|publish| publish.subscriptions.contains(&subscription))
+ .any(|publish| publish.recipients.contains(&subscription))
);
}
@@ -275,7 +275,7 @@ async fn ping_temporary_error() {
assert!(
sent.iter()
.filter(|publish| publish.message_eq(&Heartbeat::Heartbeat)
- && publish.subscriptions == subscriptions)
+ && publish.recipients == subscriptions)
.exactly_one()
.is_ok()
);
@@ -293,7 +293,7 @@ async fn ping_temporary_error() {
assert!(
sent.iter()
.filter(|publish| publish.message_eq(&Heartbeat::Heartbeat)
- && publish.subscriptions == subscriptions)
+ && publish.recipients == subscriptions)
.exactly_one()
.is_ok()
);
@@ -345,7 +345,7 @@ async fn ping_multiple_subscriptions_with_failure() {
.sent()
.iter()
.filter(|publish| publish.message_eq(&Heartbeat::Heartbeat)
- && publish.subscriptions == subscriptions)
+ && publish.recipients == subscriptions)
.exactly_one()
.is_ok()
);
@@ -362,13 +362,13 @@ async fn ping_multiple_subscriptions_with_failure() {
assert!(
sent.iter()
.filter(|publish| publish.message_eq(&Heartbeat::Heartbeat)
- && publish.subscriptions == subscriptions)
+ && publish.recipients == subscriptions)
.exactly_one()
.is_ok()
);
assert!(
!sent
.iter()
- .any(|publish| publish.subscriptions.contains(&failing))
+ .any(|publish| publish.recipients.contains(&failing))
);
}
diff --git a/src/push/publisher.rs b/src/push/publisher.rs
index 4092724..d6227a2 100644
--- a/src/push/publisher.rs
+++ b/src/push/publisher.rs
@@ -8,13 +8,14 @@ use web_push::{
use crate::error::failed::{Failed, ResultExt as _};
+#[async_trait::async_trait]
pub trait Publish {
- fn publish<M>(
+ async fn publish<'s, M>(
&self,
message: M,
- signer: PartialVapidSignatureBuilder,
- subscriptions: impl IntoIterator<Item = SubscriptionInfo> + Send,
- ) -> impl Future<Output = Result<Vec<(SubscriptionInfo, WebPushError)>, Failed>> + Send
+ signer: &PartialVapidSignatureBuilder,
+ subscriptions: impl IntoIterator<Item = &'s SubscriptionInfo> + Send,
+ ) -> Result<Vec<(&'s SubscriptionInfo, WebPushError)>, Failed>
where
M: Serialize + Send + 'static;
}
@@ -50,13 +51,14 @@ impl Publisher {
}
}
+#[async_trait::async_trait]
impl Publish for Publisher {
- async fn publish<M>(
+ async fn publish<'s, M>(
&self,
message: M,
- signer: PartialVapidSignatureBuilder,
- subscriptions: impl IntoIterator<Item = SubscriptionInfo> + Send,
- ) -> Result<Vec<(SubscriptionInfo, WebPushError)>, Failed>
+ signer: &PartialVapidSignatureBuilder,
+ subscriptions: impl IntoIterator<Item = &'s SubscriptionInfo> + Send,
+ ) -> Result<Vec<(&'s SubscriptionInfo, WebPushError)>, Failed>
where
M: Serialize + Send + 'static,
{
@@ -65,7 +67,7 @@ impl Publish for Publisher {
let messages: Vec<_> = subscriptions
.into_iter()
- .map(|sub| Self::prepare_message(&payload, &signer, &sub).map(|message| (sub, message)))
+ .map(|sub| Self::prepare_message(&payload, signer, sub).map(|message| (sub, message)))
.try_collect()?;
let deliveries = messages
diff --git a/src/push/repo.rs b/src/push/repo.rs
index 4183489..8850059 100644
--- a/src/push/repo.rs
+++ b/src/push/repo.rs
@@ -83,6 +83,27 @@ impl Push<'_> {
Ok(info)
}
+ pub async fn broadcast_from(
+ &mut self,
+ originator: &Login,
+ ) -> Result<Vec<SubscriptionInfo>, sqlx::Error> {
+ sqlx::query!(
+ r#"
+ select
+ sub.endpoint,
+ sub.p256dh,
+ sub.auth
+ from push_subscription as sub
+ join token on sub.token = token.id
+ where token.login <> $1
+ "#,
+ originator.id,
+ )
+ .map(|row| SubscriptionInfo::new(row.endpoint, row.p256dh, row.auth))
+ .fetch_all(&mut *self.0)
+ .await
+ }
+
pub async fn unsubscribe(
&mut self,
subscription: &SubscriptionInfo,