summaryrefslogtreecommitdiff
path: root/src/push/app.rs
blob: 19830550f3ec85fde6b0896f117e3044f83b5866 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
use p256::ecdsa::VerifyingKey;
use sqlx::SqlitePool;
use web_push::SubscriptionInfo;

use super::repo::Provider as _;
use crate::{
    db,
    error::failed::{ErrorExt as _, Failed, ResultExt as _},
    event::Heartbeat,
    login::Login,
    push::publisher::Publish,
    token::extract::Identity,
    vapid::repo::Provider as _,
};

pub struct Push<P> {
    db: SqlitePool,
    publisher: P,
}

impl<P> Push<P> {
    pub const fn new(db: SqlitePool, publisher: P) -> Self {
        Self { db, publisher }
    }

    pub async fn subscribe(
        &self,
        subscriber: &Identity,
        subscription: &SubscriptionInfo,
        vapid: &VerifyingKey,
    ) -> Result<(), SubscribeError> {
        let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?;

        let current = tx
            .vapid()
            .current()
            .await
            .fail("Failed to load current VAPID key")?;
        if vapid != &current.key {
            return Err(SubscribeError::StaleVapidKey(current.key));
        }

        match tx.push().create(&subscriber.token, subscription).await {
            Ok(()) => (),
            Err(err) => {
                if let Some(err) = err.as_database_error()
                    && err.is_unique_violation()
                {
                    let current = tx
                        .push()
                        .by_endpoint(&subscriber.login, &subscription.endpoint)
                        .await
                        .fail("Failed to load existing subscriptions for endpoint")?;
                    // If we already have a subscription for this endpoint, with _different_
                    // parameters, then this is a client error. They shouldn't reuse endpoint URLs,
                    // per the various RFCs.
                    //
                    // However, if we have a subscription for this endpoint with the same parameters
                    // then we accept it and silently do nothing. This may happen if, for example,
                    // the subscribe request is retried due to a network interruption where it's
                    // not clear whether the original request succeeded.
                    if &current != subscription {
                        return Err(SubscribeError::Duplicate);
                    }
                } else {
                    return Err(err.fail("Failed to create push subscription"));
                }
            }
        }

        tx.commit().await.fail(db::failed::COMMIT)?;

        Ok(())
    }
}

impl<P> Push<P>
where
    P: Publish,
{
    pub async fn ping(&self, recipient: &Login) -> Result<(), Failed> {
        let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?;

        let signer = tx
            .vapid()
            .signer()
            .await
            .fail("Failed to load current VAPID signer")?;
        let subscriptions = tx.push().by_login(recipient).await.fail_with(|| {
            format!(
                "Failed to find push subscriptions for login: {}",
                recipient.id
            )
        })?;

        // We're about to perform some fairly IO-intensive outside actions. Holding a tx open
        // across them raises the risk that other clients will encounter errors due to locks from
        // this transaction, so release it here. We'll open a new transaction if there's something
        // we need to write.
        tx.commit().await.fail(db::failed::COMMIT)?;

        self.publisher
            .publish(Heartbeat::Heartbeat, &signer, &subscriptions)
            .await
            .fail("Failed to send push message")?;
        Ok(())
    }
}

#[derive(Debug, thiserror::Error)]
pub enum SubscribeError {
    #[error("subscription created with stale VAPID key")]
    StaleVapidKey(VerifyingKey),
    #[error("subscription already exists for endpoint")]
    // The endpoint URL is not included in the error, as it is a bearer credential in its own right
    // and we want to limit its proliferation. The only intended recipient of this message is the
    // client, which already knows the endpoint anyways and doesn't need us to tell them.
    Duplicate,
    #[error(transparent)]
    Failed(#[from] Failed),
}