summaryrefslogtreecommitdiff
path: root/src/vapid
diff options
context:
space:
mode:
authorOwen Jacobson <owen@grimoire.ca>2025-11-08 16:28:10 -0500
committerOwen Jacobson <owen@grimoire.ca>2025-11-08 16:28:10 -0500
commitfc6914831743f6d683c59adb367479defe6f8b3a (patch)
tree5b997adac55f47b52f30022013b8ec3b2c10bcc5 /src/vapid
parent0ef69c7d256380e660edc45ace7f1d6151226340 (diff)
parent6bab5b4405c9adafb2ce76540595a62eea80acc0 (diff)
Integrate the prototype push notification support.
We're going to move forwards with this for now, as low-utility as it is, so that we can more easily iterate on it in a real-world environment (hi.grimoire.ca).
Diffstat (limited to 'src/vapid')
-rw-r--r--src/vapid/app.rs117
-rw-r--r--src/vapid/event.rs37
-rw-r--r--src/vapid/history.rs55
-rw-r--r--src/vapid/middleware.rs17
-rw-r--r--src/vapid/mod.rs10
-rw-r--r--src/vapid/repo.rs161
-rw-r--r--src/vapid/ser.rs63
7 files changed, 460 insertions, 0 deletions
diff --git a/src/vapid/app.rs b/src/vapid/app.rs
new file mode 100644
index 0000000..9949aa5
--- /dev/null
+++ b/src/vapid/app.rs
@@ -0,0 +1,117 @@
+use chrono::TimeDelta;
+use sqlx::SqlitePool;
+
+use super::{History, repo, repo::Provider as _};
+use crate::{
+ clock::DateTime,
+ db::NotFound as _,
+ event::{Broadcaster, Sequence, repo::Provider},
+ push::repo::Provider as _,
+};
+
+pub struct Vapid {
+ db: SqlitePool,
+ events: Broadcaster,
+}
+
+impl Vapid {
+ pub const fn new(db: SqlitePool, events: Broadcaster) -> Self {
+ Self { db, events }
+ }
+
+ pub async fn rotate_key(&self) -> Result<(), sqlx::Error> {
+ let mut tx = self.db.begin().await?;
+ // This is called from a separate CLI utility (see `cli.rs`), and we _can't_ deliver events
+ // to active clients from another process, so don't do anything that would require us to
+ // send events, like generating a new key.
+ //
+ // Instead, the server's next `refresh_key` call will generate a key and notify clients
+ // of the change. All we have to do is remove the existing key, so that the server can know
+ // to do so.
+ tx.vapid().clear().await?;
+ // Delete outstanding subscriptions for the existing VAPID key, as well. They're
+ // unserviceable once we lose the key. Clients can resubscribe when they process the next
+ // key rotation event, which will be quite quickly once the running server notices that the
+ // VAPID key has been removed.
+ tx.push().clear().await?;
+ tx.commit().await?;
+
+ Ok(())
+ }
+
+ pub async fn refresh_key(&self, ensure_at: &DateTime) -> Result<(), Error> {
+ let mut tx = self.db.begin().await?;
+ let key = tx.vapid().current().await.optional()?;
+ if key.is_none() {
+ let changed_at = tx.sequence().next(ensure_at).await?;
+ let (key, secret) = History::begin(&changed_at);
+
+ tx.vapid().clear().await?;
+ tx.vapid().store_signing_key(&secret).await?;
+
+ let events = key.events().filter(Sequence::start_from(changed_at));
+ tx.vapid().record_events(events.clone()).await?;
+
+ tx.commit().await?;
+
+ self.events.broadcast_from(events);
+ } else if let Some(key) = key
+ // Somewhat arbitrarily, rotate keys every 30 days.
+ && key.older_than(ensure_at.to_owned() - TimeDelta::days(30))
+ {
+ // If you can think of a way to factor out this duplication, be my guest. I tried.
+ // The only approach I could think of mirrors `crate::user::create::Create`, encoding
+ // the process in a state machine made of types, and that's a very complex solution
+ // to a problem that doesn't seem to merit it. -o
+ let changed_at = tx.sequence().next(ensure_at).await?;
+ let (key, secret) = key.rotate(&changed_at);
+
+ // This will delete _all_ stored subscriptions. This is fine; they're all for the
+ // current VAPID key, and we won't be able to use them anyways once the key is rotated.
+ // We have no way to inform the push broker services of that, unfortunately.
+ tx.push().clear().await?;
+ tx.vapid().clear().await?;
+ tx.vapid().store_signing_key(&secret).await?;
+
+ // Refactoring constraint: this `events` iterator borrows `key`. Anything that moves
+ // `key` has to give it back, but it can't give both `key` back and an event iterator
+ // borrowing from `key` because Rust doesn't support types that borrow from other
+ // parts of themselves.
+ let events = key.events().filter(Sequence::start_from(changed_at));
+ tx.vapid().record_events(events.clone()).await?;
+
+ // Refactoring constraint: we _really_ want to commit the transaction before we send
+ // out events, so that anything acting on those events is guaranteed to see the state
+ // of the service at some point at or after the side effects of this. I'd also prefer
+ // to keep the commit in the same method that the transaction is begun in, for clarity.
+ tx.commit().await?;
+
+ self.events.broadcast_from(events);
+ }
+ // else, the key exists and is not stale. Don't bother allocating a sequence number, and
+ // in fact throw away the whole transaction.
+
+ Ok(())
+ }
+}
+
+#[derive(Debug, thiserror::Error)]
+#[error(transparent)]
+pub enum Error {
+ Database(#[from] sqlx::Error),
+ Ecdsa(#[from] p256::ecdsa::Error),
+ Pkcs8(#[from] p256::pkcs8::Error),
+ WebPush(#[from] web_push::WebPushError),
+}
+
+impl From<repo::Error> for Error {
+ fn from(error: repo::Error) -> Self {
+ use repo::Error;
+ match error {
+ Error::Database(error) => error.into(),
+ Error::Ecdsa(error) => error.into(),
+ Error::Pkcs8(error) => error.into(),
+ Error::WebPush(error) => error.into(),
+ }
+ }
+}
diff --git a/src/vapid/event.rs b/src/vapid/event.rs
new file mode 100644
index 0000000..cf3be77
--- /dev/null
+++ b/src/vapid/event.rs
@@ -0,0 +1,37 @@
+use p256::ecdsa::VerifyingKey;
+
+use crate::event::{Instant, Sequenced};
+
+#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
+#[serde(tag = "event", rename_all = "snake_case")]
+pub enum Event {
+ Changed(Changed),
+}
+
+impl Sequenced for Event {
+ fn instant(&self) -> Instant {
+ match self {
+ Self::Changed(event) => event.instant(),
+ }
+ }
+}
+
+#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
+pub struct Changed {
+ #[serde(flatten)]
+ pub instant: Instant,
+ #[serde(with = "crate::vapid::ser::key")]
+ pub key: VerifyingKey,
+}
+
+impl From<Changed> for Event {
+ fn from(event: Changed) -> Self {
+ Self::Changed(event)
+ }
+}
+
+impl Sequenced for Changed {
+ fn instant(&self) -> Instant {
+ self.instant
+ }
+}
diff --git a/src/vapid/history.rs b/src/vapid/history.rs
new file mode 100644
index 0000000..42f062b
--- /dev/null
+++ b/src/vapid/history.rs
@@ -0,0 +1,55 @@
+use p256::ecdsa::{SigningKey, VerifyingKey};
+use rand::thread_rng;
+
+use super::event::{Changed, Event};
+use crate::{clock::DateTime, event::Instant};
+
+#[derive(Debug)]
+pub struct History {
+ pub key: VerifyingKey,
+ pub changed: Instant,
+}
+
+// Lifecycle interface
+impl History {
+ pub fn begin(changed: &Instant) -> (Self, SigningKey) {
+ let key = SigningKey::random(&mut thread_rng());
+ (
+ Self {
+ key: VerifyingKey::from(&key),
+ changed: *changed,
+ },
+ key,
+ )
+ }
+
+ // `self` _is_ unused here, clippy is right about that. This choice is deliberate, however - it
+ // makes it harder to inadvertently reuse a rotated key via its history, and it makes the
+ // lifecycle interface more obviously consistent between this and other History types.
+ #[allow(clippy::unused_self)]
+ pub fn rotate(self, changed: &Instant) -> (Self, SigningKey) {
+ Self::begin(changed)
+ }
+}
+
+// State interface
+impl History {
+ pub fn older_than(&self, when: DateTime) -> bool {
+ self.changed.at < when
+ }
+}
+
+// Events interface
+impl History {
+ pub fn events(&self) -> impl Iterator<Item = Event> + Clone {
+ [self.changed()].into_iter()
+ }
+
+ fn changed(&self) -> Event {
+ Changed {
+ key: self.key,
+ instant: self.changed,
+ }
+ .into()
+ }
+}
diff --git a/src/vapid/middleware.rs b/src/vapid/middleware.rs
new file mode 100644
index 0000000..3129aa7
--- /dev/null
+++ b/src/vapid/middleware.rs
@@ -0,0 +1,17 @@
+use axum::{
+ extract::{Request, State},
+ middleware::Next,
+ response::Response,
+};
+
+use crate::{clock::RequestedAt, error::Internal, vapid::app::Vapid};
+
+pub async fn middleware(
+ State(vapid): State<Vapid>,
+ RequestedAt(now): RequestedAt,
+ request: Request,
+ next: Next,
+) -> Result<Response, Internal> {
+ vapid.refresh_key(&now).await?;
+ Ok(next.run(request).await)
+}
diff --git a/src/vapid/mod.rs b/src/vapid/mod.rs
new file mode 100644
index 0000000..364f602
--- /dev/null
+++ b/src/vapid/mod.rs
@@ -0,0 +1,10 @@
+pub mod app;
+pub mod event;
+mod history;
+mod middleware;
+pub mod repo;
+pub mod ser;
+
+pub use event::Event;
+pub use history::History;
+pub use middleware::middleware;
diff --git a/src/vapid/repo.rs b/src/vapid/repo.rs
new file mode 100644
index 0000000..9db61e1
--- /dev/null
+++ b/src/vapid/repo.rs
@@ -0,0 +1,161 @@
+use std::io::Cursor;
+
+use p256::{
+ ecdsa::SigningKey,
+ pkcs8::{DecodePrivateKey as _, EncodePrivateKey as _, LineEnding},
+};
+use sqlx::{Sqlite, SqliteConnection, Transaction};
+use web_push::{PartialVapidSignatureBuilder, VapidSignatureBuilder};
+
+use super::{
+ History,
+ event::{Changed, Event},
+};
+use crate::{
+ clock::DateTime,
+ db::NotFound,
+ event::{Instant, Sequence},
+};
+
+pub trait Provider {
+ fn vapid(&mut self) -> Vapid<'_>;
+}
+
+impl Provider for Transaction<'_, Sqlite> {
+ fn vapid(&mut self) -> Vapid<'_> {
+ Vapid(self)
+ }
+}
+
+pub struct Vapid<'a>(&'a mut SqliteConnection);
+
+impl Vapid<'_> {
+ pub async fn record_events(
+ &mut self,
+ events: impl IntoIterator<Item = Event>,
+ ) -> Result<(), sqlx::Error> {
+ for event in events {
+ self.record_event(&event).await?;
+ }
+ Ok(())
+ }
+
+ pub async fn record_event(&mut self, event: &Event) -> Result<(), sqlx::Error> {
+ match event {
+ Event::Changed(changed) => self.record_changed(changed).await,
+ }
+ }
+
+ async fn record_changed(&mut self, changed: &Changed) -> Result<(), sqlx::Error> {
+ sqlx::query!(
+ r#"
+ insert into vapid_key (changed_at, changed_sequence)
+ values ($1, $2)
+ "#,
+ changed.instant.at,
+ changed.instant.sequence,
+ )
+ .execute(&mut *self.0)
+ .await?;
+
+ Ok(())
+ }
+
+ pub async fn clear(&mut self) -> Result<(), sqlx::Error> {
+ sqlx::query!(
+ r#"
+ delete from vapid_key
+ "#
+ )
+ .execute(&mut *self.0)
+ .await?;
+
+ sqlx::query!(
+ r#"
+ delete from vapid_signing_key
+ "#
+ )
+ .execute(&mut *self.0)
+ .await?;
+
+ Ok(())
+ }
+
+ pub async fn store_signing_key(&mut self, key: &SigningKey) -> Result<(), Error> {
+ let key = key.to_pkcs8_pem(LineEnding::CRLF)?;
+ let key = key.as_str();
+ sqlx::query!(
+ r#"
+ insert into vapid_signing_key (key)
+ values ($1)
+ "#,
+ key,
+ )
+ .execute(&mut *self.0)
+ .await?;
+
+ Ok(())
+ }
+
+ pub async fn current(&mut self) -> Result<History, Error> {
+ let key = sqlx::query!(
+ r#"
+ select
+ key.changed_at as "changed_at: DateTime",
+ key.changed_sequence as "changed_sequence: Sequence",
+ signing.key
+ from vapid_key as key
+ join vapid_signing_key as signing
+ "#
+ )
+ .map(|row| {
+ let key = SigningKey::from_pkcs8_pem(&row.key)?;
+ let key = key.verifying_key().to_owned();
+
+ let changed = Instant::new(row.changed_at, row.changed_sequence);
+
+ Ok::<_, Error>(History { key, changed })
+ })
+ .fetch_one(&mut *self.0)
+ .await??;
+
+ Ok(key)
+ }
+
+ pub async fn signer(&mut self) -> Result<PartialVapidSignatureBuilder, Error> {
+ let key = sqlx::query_scalar!(
+ r#"
+ select key
+ from vapid_signing_key
+ "#
+ )
+ .fetch_one(&mut *self.0)
+ .await?;
+ let key = Cursor::new(&key);
+ let signer = VapidSignatureBuilder::from_pem_no_sub(key)?;
+
+ Ok(signer)
+ }
+}
+
+#[derive(Debug, thiserror::Error)]
+#[error(transparent)]
+pub enum Error {
+ Ecdsa(#[from] p256::ecdsa::Error),
+ Pkcs8(#[from] p256::pkcs8::Error),
+ WebPush(#[from] web_push::WebPushError),
+ Database(#[from] sqlx::Error),
+}
+
+impl<T> NotFound for Result<T, Error> {
+ type Ok = T;
+ type Error = Error;
+
+ fn optional(self) -> Result<Option<T>, Error> {
+ match self {
+ Ok(value) => Ok(Some(value)),
+ Err(Error::Database(sqlx::Error::RowNotFound)) => Ok(None),
+ Err(other) => Err(other),
+ }
+ }
+}
diff --git a/src/vapid/ser.rs b/src/vapid/ser.rs
new file mode 100644
index 0000000..02c77e1
--- /dev/null
+++ b/src/vapid/ser.rs
@@ -0,0 +1,63 @@
+pub mod key {
+ use std::fmt;
+
+ use base64::{Engine as _, engine::general_purpose::URL_SAFE};
+ use p256::ecdsa::VerifyingKey;
+ use serde::{Deserializer, Serialize as _, de};
+
+ // This serialization - to a URL-safe base-64-encoded string and back - is based on my best
+ // understanding of RFC 8292 and the corresponding browser APIs. Particularly, it's based on
+ // section 3.2:
+ //
+ // > The "k" parameter includes an ECDSA public key [FIPS186] in uncompressed form [X9.62] that
+ // > is encoded using base64url encoding [RFC7515].
+ //
+ // <https://datatracker.ietf.org/doc/html/rfc8292#section-3.2>
+ //
+ // I believe this is also supported by MDN's explanation:
+ //
+ // > `applicationServerKey`
+ // >
+ // > A Base64-encoded string or ArrayBuffer containing an ECDSA P-256 public key that the push
+ // > server will use to authenticate your application server. If specified, all messages from
+ // > your application server must use the VAPID authentication scheme, and include a JWT signed
+ // > with the corresponding private key. This key IS NOT the same ECDH key that you use to
+ // > encrypt the data. For more information, see "Using VAPID with WebPush".
+ //
+ // <https://developer.mozilla.org/en-US/docs/Web/API/PushManager/subscribe#applicationserverkey>
+
+ pub fn serialize<S>(key: &VerifyingKey, serializer: S) -> Result<S::Ok, S::Error>
+ where
+ S: serde::Serializer,
+ {
+ let key = key.to_sec1_bytes();
+ let key = URL_SAFE.encode(key);
+ key.serialize(serializer)
+ }
+
+ pub fn deserialize<'de, D>(deserializer: D) -> Result<VerifyingKey, D::Error>
+ where
+ D: Deserializer<'de>,
+ {
+ deserializer.deserialize_str(Visitor)
+ }
+
+ struct Visitor;
+ impl de::Visitor<'_> for Visitor {
+ type Value = VerifyingKey;
+
+ fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
+ formatter.write_str("a string containing a VAPID key")
+ }
+
+ fn visit_str<E>(self, key: &str) -> Result<Self::Value, E>
+ where
+ E: de::Error,
+ {
+ let key = URL_SAFE.decode(key).map_err(E::custom)?;
+ let key = VerifyingKey::from_sec1_bytes(&key).map_err(E::custom)?;
+
+ Ok(key)
+ }
+ }
+}