1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
|
use p256::ecdsa::VerifyingKey;
use sqlx::SqlitePool;
use web_push::SubscriptionInfo;
use super::repo::Provider as _;
use crate::{
db,
error::failed::{ErrorExt as _, Failed, ResultExt as _},
event::Heartbeat,
login::Login,
push::publisher::Publish,
token::extract::Identity,
vapid::repo::Provider as _,
};
pub struct Push<P> {
db: SqlitePool,
publisher: P,
}
impl<P> Push<P> {
pub const fn new(db: SqlitePool, publisher: P) -> Self {
Self { db, publisher }
}
pub async fn subscribe(
&self,
subscriber: &Identity,
subscription: &SubscriptionInfo,
vapid: &VerifyingKey,
) -> Result<(), SubscribeError> {
let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?;
let current = tx
.vapid()
.current()
.await
.fail("Failed to load current VAPID key")?;
if vapid != ¤t.key {
return Err(SubscribeError::StaleVapidKey(current.key));
}
match tx.push().create(&subscriber.token, subscription).await {
Ok(()) => (),
Err(err) => {
if let Some(err) = err.as_database_error()
&& err.is_unique_violation()
{
let current = tx
.push()
.by_endpoint(&subscriber.login, &subscription.endpoint)
.await
.fail("Failed to load existing subscriptions for endpoint")?;
// If we already have a subscription for this endpoint, with _different_
// parameters, then this is a client error. They shouldn't reuse endpoint URLs,
// per the various RFCs.
//
// However, if we have a subscription for this endpoint with the same parameters
// then we accept it and silently do nothing. This may happen if, for example,
// the subscribe request is retried due to a network interruption where it's
// not clear whether the original request succeeded.
if ¤t != subscription {
return Err(SubscribeError::Duplicate);
}
} else {
return Err(err.fail("Failed to create push subscription"));
}
}
}
tx.commit().await.fail(db::failed::COMMIT)?;
Ok(())
}
}
impl<P> Push<P>
where
P: Publish,
{
pub async fn ping(&self, recipient: &Login) -> Result<(), Failed> {
let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?;
let signer = tx
.vapid()
.signer()
.await
.fail("Failed to load current VAPID signer")?;
let subscriptions = tx.push().by_login(recipient).await.fail_with(|| {
format!(
"Failed to find push subscriptions for login: {}",
recipient.id
)
})?;
// We're about to perform some fairly IO-intensive outside actions. Holding a tx open
// across them raises the risk that other clients will encounter errors due to locks from
// this transaction, so release it here. We'll open a new transaction if there's something
// we need to write.
tx.commit().await.fail(db::failed::COMMIT)?;
self.publisher
.publish(Heartbeat::Heartbeat, &signer, &subscriptions)
.await
.fail("Failed to send push message")?;
Ok(())
}
}
#[derive(Debug, thiserror::Error)]
pub enum SubscribeError {
#[error("subscription created with stale VAPID key")]
StaleVapidKey(VerifyingKey),
#[error("subscription already exists for endpoint")]
// The endpoint URL is not included in the error, as it is a bearer credential in its own right
// and we want to limit its proliferation. The only intended recipient of this message is the
// client, which already knows the endpoint anyways and doesn't need us to tell them.
Duplicate,
#[error(transparent)]
Failed(#[from] Failed),
}
|