1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
|
use p256::ecdsa::VerifyingKey;
use sqlx::SqlitePool;
use web_push::{SubscriptionInfo, WebPushError};
use super::repo::Provider as _;
use crate::{
db,
error::failed::{ErrorExt as _, Failed, ResultExt as _},
event::Heartbeat,
login::Login,
push::publisher::Publish,
token::extract::Identity,
vapid::repo::Provider as _,
};
pub struct Push<P> {
db: SqlitePool,
publisher: P,
}
impl<P> Push<P> {
pub const fn new(db: SqlitePool, publisher: P) -> Self {
Self { db, publisher }
}
pub async fn subscribe(
&self,
subscriber: &Identity,
subscription: &SubscriptionInfo,
vapid: &VerifyingKey,
) -> Result<(), SubscribeError> {
let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?;
let current = tx
.vapid()
.current()
.await
.fail("Failed to load current VAPID key")?;
if vapid != ¤t.key {
return Err(SubscribeError::StaleVapidKey(current.key));
}
match tx.push().create(&subscriber.token, subscription).await {
Ok(()) => (),
Err(err) => {
if let Some(err) = err.as_database_error()
&& err.is_unique_violation()
{
let current = tx
.push()
.by_endpoint(&subscriber.login, &subscription.endpoint)
.await
.fail("Failed to load existing subscriptions for endpoint")?;
// If we already have a subscription for this endpoint, with _different_
// parameters, then this is a client error. They shouldn't reuse endpoint URLs,
// per the various RFCs.
//
// However, if we have a subscription for this endpoint with the same parameters
// then we accept it and silently do nothing. This may happen if, for example,
// the subscribe request is retried due to a network interruption where it's
// not clear whether the original request succeeded.
if ¤t != subscription {
return Err(SubscribeError::Duplicate);
}
} else {
return Err(err.fail("Failed to create push subscription"));
}
}
}
tx.commit().await.fail(db::failed::COMMIT)?;
Ok(())
}
}
impl<P> Push<P>
where
P: Publish,
{
pub async fn ping(&self, recipient: &Login) -> Result<(), PushError> {
let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?;
let signer = tx
.vapid()
.signer()
.await
.fail("Failed to load current VAPID signer")?;
let subscriptions = tx.push().by_login(recipient).await.fail_with(|| {
format!(
"Failed to find push subscriptions for login: {}",
recipient.id
)
})?;
// We're about to perform some fairly IO-intensive outside actions. Holding a tx open
// across them raises the risk that other clients will encounter errors due to locks from
// this transaction, so release it here. We'll open a new transaction if there's something
// we need to write.
tx.commit().await.fail(db::failed::COMMIT)?;
let failures = self
.publisher
.publish(Heartbeat::Heartbeat, signer, subscriptions)
.await
.fail("Failed to send push message")?;
if !failures.is_empty() {
let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?;
// Note that data integrity guarantees from the original transaction to read
// subscriptions may no longer be valid now. Time has passed. Depending on how slow
// delivering push notifications is, potentially a _lot_ of time has passed.
for (sub, err) in &failures {
match err {
// I _think_ this is the complete set of permanent failures. See
// <https://docs.rs/web-push/latest/web_push/enum.WebPushError.html> for a complete
// list.
WebPushError::Unauthorized(_)
| WebPushError::InvalidUri
| WebPushError::EndpointNotValid(_)
| WebPushError::EndpointNotFound(_)
| WebPushError::InvalidCryptoKeys
| WebPushError::MissingCryptoKeys => {
tx.push()
.unsubscribe(sub)
.await
.fail("Failed to unsubscribe after permanent push message rejection")?;
}
_ => (),
}
}
tx.commit().await.fail(db::failed::COMMIT)?;
return Err(PushError::Delivery(
failures.into_iter().map(|(_, err)| err).collect(),
));
}
Ok(())
}
}
#[derive(Debug, thiserror::Error)]
pub enum SubscribeError {
#[error("subscription created with stale VAPID key")]
StaleVapidKey(VerifyingKey),
#[error("subscription already exists for endpoint")]
// The endpoint URL is not included in the error, as it is a bearer credential in its own right
// and we want to limit its proliferation. The only intended recipient of this message is the
// client, which already knows the endpoint anyways and doesn't need us to tell them.
Duplicate,
#[error(transparent)]
Failed(#[from] Failed),
}
#[derive(Debug, thiserror::Error)]
pub enum PushError {
#[error("push message delivery failures: {0:?}")]
Delivery(Vec<WebPushError>),
#[error(transparent)]
Failed(#[from] Failed),
}
|