1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
|
use futures::future::join_all;
use itertools::Itertools as _;
use serde::Serialize;
use sqlx::SqlitePool;
use web_push::{
ContentEncoding, IsahcWebPushClient, PartialVapidSignatureBuilder, SubscriptionInfo,
WebPushClient, WebPushError, WebPushMessage, WebPushMessageBuilder,
};
use crate::{
db,
error::failed::{Failed, ResultExt as _},
push::repo::Provider,
};
#[async_trait::async_trait]
pub trait Publish {
async fn publish<M>(
&self,
message: M,
signer: &PartialVapidSignatureBuilder,
subscriptions: impl IntoIterator<Item = &'_ SubscriptionInfo> + Send,
) -> Result<(), Failed>
where
M: Serialize + Send + 'static;
}
#[derive(Clone)]
pub struct Publisher {
db: SqlitePool,
client: IsahcWebPushClient,
}
impl Publisher {
pub fn new(db: SqlitePool) -> Result<Self, WebPushError> {
let client = IsahcWebPushClient::new()?;
Ok(Self { db, client })
}
fn prepare_message(
payload: &[u8],
signer: &PartialVapidSignatureBuilder,
subscription: &SubscriptionInfo,
) -> Result<WebPushMessage, Failed> {
let signature = signer
.clone()
.add_sub_info(subscription)
.build()
.fail("Failed to build VAPID signature")?;
let mut message = WebPushMessageBuilder::new(subscription);
message.set_payload(ContentEncoding::Aes128Gcm, payload);
message.set_vapid_signature(signature);
let message = message.build().fail("Failed to build push message")?;
Ok(message)
}
async fn settle_failed(
&self,
failures: Vec<(&SubscriptionInfo, WebPushError)>,
) -> Result<(), Failed> {
if !failures.is_empty() {
let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?;
// Note that data integrity guarantees from whatever transaction originally read the
// subscriptions may no longer be valid now. Time has passed. Depending on how slow
// delivering push notifications is, potentially a _lot_ of time has passed.
for (sub, err) in &failures {
match err {
// I _think_ this is the complete set of permanent failures. See
// <https://docs.rs/web-push/latest/web_push/enum.WebPushError.html> for a complete
// list.
WebPushError::Unauthorized(_)
| WebPushError::InvalidUri
| WebPushError::EndpointNotValid(_)
| WebPushError::EndpointNotFound(_)
| WebPushError::InvalidCryptoKeys
| WebPushError::MissingCryptoKeys => {
tx.push()
.unsubscribe(sub)
.await
.fail("Failed to unsubscribe after permanent push message rejection")?;
}
_ => (),
}
}
tx.commit().await.fail(db::failed::COMMIT)?;
}
Ok(())
}
}
#[async_trait::async_trait]
impl Publish for Publisher {
async fn publish<M>(
&self,
message: M,
signer: &PartialVapidSignatureBuilder,
subscriptions: impl IntoIterator<Item = &'_ SubscriptionInfo> + Send,
) -> Result<(), Failed>
where
M: Serialize + Send + 'static,
{
let payload = serde_json::to_vec_pretty(&message)
.fail("Failed to encode web push message to JSON")?;
let messages: Vec<_> = subscriptions
.into_iter()
.map(|sub| Self::prepare_message(&payload, signer, sub).map(|message| (sub, message)))
.try_collect()?;
let deliveries = messages
.into_iter()
.map(async |(sub, message)| (sub, self.client.send(message).await));
let failures: Vec<_> = join_all(deliveries)
.await
.into_iter()
.filter_map(|(sub, result)| result.err().map(|err| (sub, err)))
.collect();
self.settle_failed(failures).await?;
Ok(())
}
}
|