summaryrefslogtreecommitdiff
path: root/src/push/publisher.rs
blob: ef23f2ffe7e3ca645af167afbe5c523ca0b482ea (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
use futures::future::join_all;
use itertools::Itertools as _;
use serde::Serialize;
use sqlx::SqlitePool;
use web_push::{
    ContentEncoding, IsahcWebPushClient, PartialVapidSignatureBuilder, SubscriptionInfo,
    WebPushClient, WebPushError, WebPushMessage, WebPushMessageBuilder,
};

use crate::{
    db,
    error::failed::{Failed, ResultExt as _},
    push::repo::Provider,
};

#[async_trait::async_trait]
pub trait Publish {
    async fn publish<M>(
        &self,
        message: M,
        signer: &PartialVapidSignatureBuilder,
        subscriptions: impl IntoIterator<Item = &'_ SubscriptionInfo> + Send,
    ) -> Result<(), Failed>
    where
        M: Serialize + Send + 'static;
}

#[derive(Clone)]
pub struct Publisher {
    db: SqlitePool,
    client: IsahcWebPushClient,
}

impl Publisher {
    pub fn new(db: SqlitePool) -> Result<Self, WebPushError> {
        let client = IsahcWebPushClient::new()?;
        Ok(Self { db, client })
    }

    fn prepare_message(
        payload: &[u8],
        signer: &PartialVapidSignatureBuilder,
        subscription: &SubscriptionInfo,
    ) -> Result<WebPushMessage, Failed> {
        let signature = signer
            .clone()
            .add_sub_info(subscription)
            .build()
            .fail("Failed to build VAPID signature")?;

        let mut message = WebPushMessageBuilder::new(subscription);
        message.set_payload(ContentEncoding::Aes128Gcm, payload);
        message.set_vapid_signature(signature);
        let message = message.build().fail("Failed to build push message")?;

        Ok(message)
    }

    async fn settle_failed(
        &self,
        failures: Vec<(&SubscriptionInfo, WebPushError)>,
    ) -> Result<(), Failed> {
        if !failures.is_empty() {
            let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?;
            // Note that data integrity guarantees from whatever transaction originally read the
            // subscriptions may no longer be valid now. Time has passed. Depending on how slow
            // delivering push notifications is, potentially a _lot_ of time has passed.

            for (sub, err) in &failures {
                match err {
                    // I _think_ this is the complete set of permanent failures. See
                    // <https://docs.rs/web-push/latest/web_push/enum.WebPushError.html> for a complete
                    // list.
                    WebPushError::Unauthorized(_)
                    | WebPushError::InvalidUri
                    | WebPushError::EndpointNotValid(_)
                    | WebPushError::EndpointNotFound(_)
                    | WebPushError::InvalidCryptoKeys
                    | WebPushError::MissingCryptoKeys => {
                        tx.push()
                            .unsubscribe(sub)
                            .await
                            .fail("Failed to unsubscribe after permanent push message rejection")?;
                    }
                    _ => (),
                }
            }

            tx.commit().await.fail(db::failed::COMMIT)?;
        }

        Ok(())
    }
}

#[async_trait::async_trait]
impl Publish for Publisher {
    async fn publish<M>(
        &self,
        message: M,
        signer: &PartialVapidSignatureBuilder,
        subscriptions: impl IntoIterator<Item = &'_ SubscriptionInfo> + Send,
    ) -> Result<(), Failed>
    where
        M: Serialize + Send + 'static,
    {
        let payload = serde_json::to_vec_pretty(&message)
            .fail("Failed to encode web push message to JSON")?;

        let messages: Vec<_> = subscriptions
            .into_iter()
            .map(|sub| Self::prepare_message(&payload, signer, sub).map(|message| (sub, message)))
            .try_collect()?;

        let deliveries = messages
            .into_iter()
            .map(async |(sub, message)| (sub, self.client.send(message).await));

        let failures: Vec<_> = join_all(deliveries)
            .await
            .into_iter()
            .filter_map(|(sub, result)| result.err().map(|err| (sub, err)))
            .collect();

        self.settle_failed(failures).await?;

        Ok(())
    }
}