summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/app.rs5
-rw-r--r--src/boot/app.rs23
-rw-r--r--src/boot/handlers/boot/test.rs68
-rw-r--r--src/event/app.rs22
-rw-r--r--src/event/handlers/stream/test/mod.rs1
-rw-r--r--src/event/handlers/stream/test/vapid.rs111
-rw-r--r--src/event/mod.rs10
-rw-r--r--src/lib.rs1
-rw-r--r--src/routes.rs8
-rw-r--r--src/test/fixtures/event/mod.rs21
-rw-r--r--src/test/fixtures/event/stream.rs17
-rw-r--r--src/vapid/app.rs89
-rw-r--r--src/vapid/event.rs48
-rw-r--r--src/vapid/history.rs55
-rw-r--r--src/vapid/middleware.rs17
-rw-r--r--src/vapid/mod.rs9
-rw-r--r--src/vapid/repo.rs139
17 files changed, 641 insertions, 3 deletions
diff --git a/src/app.rs b/src/app.rs
index e61672f..0f3f6ad 100644
--- a/src/app.rs
+++ b/src/app.rs
@@ -11,6 +11,7 @@ use crate::{
message::app::Messages,
setup::app::Setup,
token::{self, app::Tokens},
+ vapid::app::Vapid,
};
#[derive(Clone)]
@@ -69,4 +70,8 @@ impl App {
pub const fn users(&self) -> Users<'_> {
Users::new(&self.db, &self.events)
}
+
+ pub const fn vapid(&self) -> Vapid<'_> {
+ Vapid::new(&self.db, &self.events)
+ }
}
diff --git a/src/boot/app.rs b/src/boot/app.rs
index 0ed5d1b..543429f 100644
--- a/src/boot/app.rs
+++ b/src/boot/app.rs
@@ -4,10 +4,12 @@ use sqlx::sqlite::SqlitePool;
use super::Snapshot;
use crate::{
conversation::{self, repo::Provider as _},
+ db::NotFound,
event::{Event, Sequence, repo::Provider as _},
message::{self, repo::Provider as _},
name,
user::{self, repo::Provider as _},
+ vapid::{self, repo::Provider as _},
};
pub struct Boot<'a> {
@@ -26,6 +28,7 @@ impl<'a> Boot<'a> {
let users = tx.users().all(resume_point).await?;
let conversations = tx.conversations().all(resume_point).await?;
let messages = tx.messages().all(resume_point).await?;
+ let vapid = tx.vapid().current().await.optional()?;
tx.commit().await?;
@@ -50,9 +53,16 @@ impl<'a> Boot<'a> {
.filter(Sequence::up_to(resume_point))
.map(Event::from);
+ let vapid_events = vapid
+ .iter()
+ .flat_map(vapid::History::events)
+ .filter(Sequence::up_to(resume_point))
+ .map(Event::from);
+
let events = user_events
.merge_by(conversation_events, Sequence::merge)
.merge_by(message_events, Sequence::merge)
+ .merge_by(vapid_events, Sequence::merge)
.collect();
Ok(Snapshot {
@@ -65,8 +75,9 @@ impl<'a> Boot<'a> {
#[derive(Debug, thiserror::Error)]
#[error(transparent)]
pub enum Error {
- Name(#[from] name::Error),
Database(#[from] sqlx::Error),
+ Name(#[from] name::Error),
+ Ecdsa(#[from] p256::ecdsa::Error),
}
impl From<user::repo::LoadError> for Error {
@@ -88,3 +99,13 @@ impl From<conversation::repo::LoadError> for Error {
}
}
}
+
+impl From<vapid::repo::Error> for Error {
+ fn from(error: vapid::repo::Error) -> Self {
+ use vapid::repo::Error;
+ match error {
+ Error::Database(error) => error.into(),
+ Error::Ecdsa(error) => error.into(),
+ }
+ }
+}
diff --git a/src/boot/handlers/boot/test.rs b/src/boot/handlers/boot/test.rs
index cb50442..3c09b0f 100644
--- a/src/boot/handlers/boot/test.rs
+++ b/src/boot/handlers/boot/test.rs
@@ -81,6 +81,74 @@ async fn includes_messages() {
}
#[tokio::test]
+async fn includes_vapid_key() {
+ let app = fixtures::scratch_app().await;
+
+ app.vapid()
+ .refresh_key(&fixtures::now())
+ .await
+ .expect("key rotation always succeeds");
+
+ let viewer = fixtures::identity::fictitious();
+ let response = super::handler(State(app), viewer)
+ .await
+ .expect("boot always succeeds");
+
+ response
+ .snapshot
+ .events
+ .into_iter()
+ .filter_map(fixtures::event::vapid)
+ .filter_map(fixtures::event::vapid::changed)
+ .exactly_one()
+ .expect("only one vapid key has been created");
+}
+
+#[tokio::test]
+async fn includes_only_latest_vapid_key() {
+ let app = fixtures::scratch_app().await;
+
+ app.vapid()
+ .refresh_key(&fixtures::ancient())
+ .await
+ .expect("key rotation always succeeds");
+
+ let viewer = fixtures::identity::fictitious();
+ let response = super::handler(State(app.clone()), viewer.clone())
+ .await
+ .expect("boot always succeeds");
+
+ let original_key = response
+ .snapshot
+ .events
+ .into_iter()
+ .filter_map(fixtures::event::vapid)
+ .filter_map(fixtures::event::vapid::changed)
+ .exactly_one()
+ .expect("only one vapid key has been created");
+
+ app.vapid()
+ .refresh_key(&fixtures::now())
+ .await
+ .expect("key rotation always succeeds");
+
+ let response = super::handler(State(app), viewer)
+ .await
+ .expect("boot always succeeds");
+
+ let rotated_key = response
+ .snapshot
+ .events
+ .into_iter()
+ .filter_map(fixtures::event::vapid)
+ .filter_map(fixtures::event::vapid::changed)
+ .exactly_one()
+ .expect("only one vapid key should be returned");
+
+ assert_ne!(original_key, rotated_key);
+}
+
+#[tokio::test]
async fn includes_expired_messages() {
let app = fixtures::scratch_app().await;
let sender = fixtures::user::create(&app, &fixtures::ancient()).await;
diff --git a/src/event/app.rs b/src/event/app.rs
index 7359bfb..fe90465 100644
--- a/src/event/app.rs
+++ b/src/event/app.rs
@@ -8,9 +8,12 @@ use sqlx::sqlite::SqlitePool;
use super::{Event, Sequence, Sequenced, broadcaster::Broadcaster};
use crate::{
conversation::{self, repo::Provider as _},
+ db::NotFound,
message::{self, repo::Provider as _},
name,
user::{self, repo::Provider as _},
+ vapid,
+ vapid::repo::Provider as _,
};
pub struct Events<'a> {
@@ -57,9 +60,17 @@ impl<'a> Events<'a> {
.filter(Sequence::after(resume_at))
.map(Event::from);
+ let vapid = tx.vapid().current().await.optional()?;
+ let vapid_events = vapid
+ .iter()
+ .flat_map(vapid::History::events)
+ .filter(Sequence::after(resume_at))
+ .map(Event::from);
+
let replay_events = user_events
.merge_by(conversation_events, Sequence::merge)
.merge_by(message_events, Sequence::merge)
+ .merge_by(vapid_events, Sequence::merge)
.collect::<Vec<_>>();
let resume_live_at = replay_events.last().map_or(resume_at, Sequenced::sequence);
@@ -86,6 +97,7 @@ impl<'a> Events<'a> {
pub enum Error {
Database(#[from] sqlx::Error),
Name(#[from] name::Error),
+ Ecdsa(#[from] p256::ecdsa::Error),
}
impl From<user::repo::LoadError> for Error {
@@ -107,3 +119,13 @@ impl From<conversation::repo::LoadError> for Error {
}
}
}
+
+impl From<vapid::repo::Error> for Error {
+ fn from(error: vapid::repo::Error) -> Self {
+ use vapid::repo::Error;
+ match error {
+ Error::Database(error) => error.into(),
+ Error::Ecdsa(error) => error.into(),
+ }
+ }
+}
diff --git a/src/event/handlers/stream/test/mod.rs b/src/event/handlers/stream/test/mod.rs
index 3bc634f..c3a6ce6 100644
--- a/src/event/handlers/stream/test/mod.rs
+++ b/src/event/handlers/stream/test/mod.rs
@@ -4,5 +4,6 @@ mod message;
mod resume;
mod setup;
mod token;
+mod vapid;
use super::{QueryParams, Response, handler};
diff --git a/src/event/handlers/stream/test/vapid.rs b/src/event/handlers/stream/test/vapid.rs
new file mode 100644
index 0000000..dbc3929
--- /dev/null
+++ b/src/event/handlers/stream/test/vapid.rs
@@ -0,0 +1,111 @@
+use axum::extract::State;
+use axum_extra::extract::Query;
+use futures::StreamExt as _;
+
+use crate::test::{fixtures, fixtures::future::Expect as _};
+
+#[tokio::test]
+async fn live_vapid_key_changes() {
+ // Set up the context
+ let app = fixtures::scratch_app().await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
+
+ // Subscribe to events
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let super::Response(events) = super::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(super::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ // Rotate the VAPID key
+
+ app.vapid()
+ .refresh_key(&fixtures::now())
+ .await
+ .expect("refreshing the vapid key always succeeds");
+
+ // Verify that there's a key rotation event
+
+ events
+ .filter_map(fixtures::event::stream::vapid)
+ .filter_map(fixtures::event::stream::vapid::changed)
+ .next()
+ .expect_some("a vapid key change event is sent")
+ .await;
+}
+
+#[tokio::test]
+async fn stored_vapid_key_changes() {
+ // Set up the context
+ let app = fixtures::scratch_app().await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
+
+ // Rotate the VAPID key
+
+ app.vapid()
+ .refresh_key(&fixtures::now())
+ .await
+ .expect("refreshing the vapid key always succeeds");
+
+ // Subscribe to events
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let super::Response(events) = super::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(super::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ // Verify that there's a key rotation event
+
+ events
+ .filter_map(fixtures::event::stream::vapid)
+ .filter_map(fixtures::event::stream::vapid::changed)
+ .next()
+ .expect_some("a vapid key change event is sent")
+ .await;
+}
+
+#[tokio::test]
+async fn no_past_vapid_key_changes() {
+ // Set up the context
+ let app = fixtures::scratch_app().await;
+
+ // Rotate the VAPID key
+
+ app.vapid()
+ .refresh_key(&fixtures::now())
+ .await
+ .expect("refreshing the vapid key always succeeds");
+
+ // Subscribe to events
+
+ let resume_point = fixtures::boot::resume_point(&app).await;
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let super::Response(events) = super::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(super::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ // Verify that there's a key rotation event
+
+ events
+ .filter_map(fixtures::event::stream::vapid)
+ .filter_map(fixtures::event::stream::vapid::changed)
+ .next()
+ .expect_wait("a vapid key change event is not sent")
+ .await;
+}
diff --git a/src/event/mod.rs b/src/event/mod.rs
index f41dc9c..83b0ce7 100644
--- a/src/event/mod.rs
+++ b/src/event/mod.rs
@@ -2,7 +2,7 @@ use std::time::Duration;
use axum::response::sse::{self, KeepAlive};
-use crate::{conversation, message, user};
+use crate::{conversation, message, user, vapid};
pub mod app;
mod broadcaster;
@@ -22,6 +22,7 @@ pub enum Event {
User(user::Event),
Conversation(conversation::Event),
Message(message::Event),
+ Vapid(vapid::Event),
}
// Serialized representation is intended to look like the serialized representation of `Event`,
@@ -40,6 +41,7 @@ impl Sequenced for Event {
Self::User(event) => event.instant(),
Self::Conversation(event) => event.instant(),
Self::Message(event) => event.instant(),
+ Self::Vapid(event) => event.instant(),
}
}
}
@@ -62,6 +64,12 @@ impl From<message::Event> for Event {
}
}
+impl From<vapid::Event> for Event {
+ fn from(event: vapid::Event) -> Self {
+ Self::Vapid(event)
+ }
+}
+
impl Heartbeat {
// The following values are a first-rough-guess attempt to balance noticing connection problems
// quickly with managing the (modest) costs of delivering and processing heartbeats. Feel
diff --git a/src/lib.rs b/src/lib.rs
index f05cce3..6b2a83c 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -28,3 +28,4 @@ mod token;
mod ui;
mod umask;
mod user;
+mod vapid;
diff --git a/src/routes.rs b/src/routes.rs
index b848afb..2979abe 100644
--- a/src/routes.rs
+++ b/src/routes.rs
@@ -4,7 +4,9 @@ use axum::{
routing::{delete, get, post},
};
-use crate::{app::App, boot, conversation, event, expire, invite, login, message, setup, ui};
+use crate::{
+ app::App, boot, conversation, event, expire, invite, login, message, setup, ui, vapid,
+};
pub fn routes(app: &App) -> Router<App> {
// UI routes that can be accessed before the administrator completes setup.
@@ -56,6 +58,10 @@ pub fn routes(app: &App) -> Router<App> {
app.clone(),
expire::middleware,
))
+ .route_layer(middleware::from_fn_with_state(
+ app.clone(),
+ vapid::middleware,
+ ))
.route_layer(setup::Required(app.clone()));
[
diff --git a/src/test/fixtures/event/mod.rs b/src/test/fixtures/event/mod.rs
index 08b17e7..f8651ba 100644
--- a/src/test/fixtures/event/mod.rs
+++ b/src/test/fixtures/event/mod.rs
@@ -23,6 +23,13 @@ pub fn user(event: Event) -> Option<crate::user::Event> {
}
}
+pub fn vapid(event: Event) -> Option<crate::vapid::Event> {
+ match event {
+ Event::Vapid(event) => Some(event),
+ _ => None,
+ }
+}
+
pub mod conversation {
use crate::conversation::{Event, event};
@@ -72,3 +79,17 @@ pub mod user {
}
}
}
+
+pub mod vapid {
+ use crate::vapid::{Event, event};
+
+ // This could be defined as `-> event::Changed`. However, I want the interface to be consistent
+ // with the event stream transformers for other types, and we'd have to refactor the return type
+ // to `-> Option<event::Changed>` the instant VAPID keys sprout a second event.
+ #[allow(clippy::unnecessary_wraps)]
+ pub fn changed(event: Event) -> Option<event::Changed> {
+ match event {
+ Event::Changed(changed) => Some(changed),
+ }
+ }
+}
diff --git a/src/test/fixtures/event/stream.rs b/src/test/fixtures/event/stream.rs
index 5b3621d..bb83d0d 100644
--- a/src/test/fixtures/event/stream.rs
+++ b/src/test/fixtures/event/stream.rs
@@ -14,6 +14,10 @@ pub fn user(event: Event) -> Ready<Option<crate::user::Event>> {
future::ready(event::user(event))
}
+pub fn vapid(event: Event) -> Ready<Option<crate::vapid::Event>> {
+ future::ready(event::vapid(event))
+}
+
pub mod conversation {
use std::future::{self, Ready};
@@ -60,3 +64,16 @@ pub mod user {
future::ready(user::created(event))
}
}
+
+pub mod vapid {
+ use std::future::{self, Ready};
+
+ use crate::{
+ test::fixtures::event::vapid,
+ vapid::{Event, event},
+ };
+
+ pub fn changed(event: Event) -> Ready<Option<event::Changed>> {
+ future::ready(vapid::changed(event))
+ }
+}
diff --git a/src/vapid/app.rs b/src/vapid/app.rs
new file mode 100644
index 0000000..8886c9f
--- /dev/null
+++ b/src/vapid/app.rs
@@ -0,0 +1,89 @@
+use chrono::TimeDelta;
+use sqlx::SqlitePool;
+
+use super::{History, repo, repo::Provider as _};
+use crate::{
+ clock::DateTime,
+ db::NotFound as _,
+ event::{Broadcaster, Sequence, repo::Provider},
+};
+
+pub struct Vapid<'a> {
+ db: &'a SqlitePool,
+ events: &'a Broadcaster,
+}
+
+impl<'a> Vapid<'a> {
+ pub const fn new(db: &'a SqlitePool, events: &'a Broadcaster) -> Self {
+ Self { db, events }
+ }
+
+ pub async fn refresh_key(&self, ensure_at: &DateTime) -> Result<(), Error> {
+ let mut tx = self.db.begin().await?;
+ let key = tx.vapid().current().await.optional()?;
+ if key.is_none() {
+ let changed_at = tx.sequence().next(ensure_at).await?;
+ let (key, secret) = History::begin(&changed_at);
+
+ tx.vapid().clear().await?;
+ tx.vapid().store_signing_key(&secret).await?;
+
+ let events = key.events().filter(Sequence::start_from(changed_at));
+ tx.vapid().record_events(events.clone()).await?;
+
+ tx.commit().await?;
+
+ self.events.broadcast_from(events);
+ } else if let Some(key) = key
+ // Somewhat arbitrarily, rotate keys every 30 days.
+ && key.older_than(ensure_at.to_owned() - TimeDelta::days(30))
+ {
+ // If you can think of a way to factor out this duplication, be my guest. I tried.
+ // The only approach I could think of mirrors `crate::user::create::Create`, encoding
+ // the process in a state machine made of types, and that's a very complex solution
+ // to a problem that doesn't seem to merit it. -o
+ let changed_at = tx.sequence().next(ensure_at).await?;
+ let (key, secret) = key.rotate(&changed_at);
+
+ tx.vapid().clear().await?;
+ tx.vapid().store_signing_key(&secret).await?;
+
+ // Refactoring constraint: this `events` iterator borrows `key`. Anything that moves
+ // `key` has to give it back, but it can't give both `key` back and an event iterator
+ // borrowing from `key` because Rust doesn't support types that borrow from other
+ // parts of themselves.
+ let events = key.events().filter(Sequence::start_from(changed_at));
+ tx.vapid().record_events(events.clone()).await?;
+
+ // Refactoring constraint: we _really_ want to commit the transaction before we send
+ // out events, so that anything acting on those events is guaranteed to see the state
+ // of the service at some point at or after the side effects of this. I'd also prefer
+ // to keep the commit in the same method that the transaction is begun in, for clarity.
+ tx.commit().await?;
+
+ self.events.broadcast_from(events);
+ }
+ // else, the key exists and is not stale. Don't bother allocating a sequence number, and
+ // in fact throw away the whole transaction.
+
+ Ok(())
+ }
+}
+
+#[derive(Debug, thiserror::Error)]
+pub enum Error {
+ #[error(transparent)]
+ Database(#[from] sqlx::Error),
+ #[error(transparent)]
+ Ecdsa(#[from] p256::ecdsa::Error),
+}
+
+impl From<repo::Error> for Error {
+ fn from(error: repo::Error) -> Self {
+ use repo::Error;
+ match error {
+ Error::Database(error) => error.into(),
+ Error::Ecdsa(error) => error.into(),
+ }
+ }
+}
diff --git a/src/vapid/event.rs b/src/vapid/event.rs
new file mode 100644
index 0000000..af70ac2
--- /dev/null
+++ b/src/vapid/event.rs
@@ -0,0 +1,48 @@
+use base64::{Engine, engine::general_purpose::URL_SAFE};
+use p256::ecdsa::VerifyingKey;
+use serde::Serialize;
+
+use crate::event::{Instant, Sequenced};
+
+#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
+#[serde(tag = "event", rename_all = "snake_case")]
+pub enum Event {
+ Changed(Changed),
+}
+
+impl Sequenced for Event {
+ fn instant(&self) -> Instant {
+ match self {
+ Self::Changed(event) => event.instant(),
+ }
+ }
+}
+
+#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
+pub struct Changed {
+ #[serde(flatten)]
+ pub instant: Instant,
+ #[serde(serialize_with = "as_vapid_key")]
+ pub key: VerifyingKey,
+}
+
+impl From<Changed> for Event {
+ fn from(event: Changed) -> Self {
+ Self::Changed(event)
+ }
+}
+
+impl Sequenced for Changed {
+ fn instant(&self) -> Instant {
+ self.instant
+ }
+}
+
+fn as_vapid_key<S>(key: &VerifyingKey, serializer: S) -> Result<S::Ok, S::Error>
+where
+ S: serde::Serializer,
+{
+ let key = key.to_sec1_bytes();
+ let key = URL_SAFE.encode(key);
+ key.serialize(serializer)
+}
diff --git a/src/vapid/history.rs b/src/vapid/history.rs
new file mode 100644
index 0000000..42f062b
--- /dev/null
+++ b/src/vapid/history.rs
@@ -0,0 +1,55 @@
+use p256::ecdsa::{SigningKey, VerifyingKey};
+use rand::thread_rng;
+
+use super::event::{Changed, Event};
+use crate::{clock::DateTime, event::Instant};
+
+#[derive(Debug)]
+pub struct History {
+ pub key: VerifyingKey,
+ pub changed: Instant,
+}
+
+// Lifecycle interface
+impl History {
+ pub fn begin(changed: &Instant) -> (Self, SigningKey) {
+ let key = SigningKey::random(&mut thread_rng());
+ (
+ Self {
+ key: VerifyingKey::from(&key),
+ changed: *changed,
+ },
+ key,
+ )
+ }
+
+ // `self` _is_ unused here, clippy is right about that. This choice is deliberate, however - it
+ // makes it harder to inadvertently reuse a rotated key via its history, and it makes the
+ // lifecycle interface more obviously consistent between this and other History types.
+ #[allow(clippy::unused_self)]
+ pub fn rotate(self, changed: &Instant) -> (Self, SigningKey) {
+ Self::begin(changed)
+ }
+}
+
+// State interface
+impl History {
+ pub fn older_than(&self, when: DateTime) -> bool {
+ self.changed.at < when
+ }
+}
+
+// Events interface
+impl History {
+ pub fn events(&self) -> impl Iterator<Item = Event> + Clone {
+ [self.changed()].into_iter()
+ }
+
+ fn changed(&self) -> Event {
+ Changed {
+ key: self.key,
+ instant: self.changed,
+ }
+ .into()
+ }
+}
diff --git a/src/vapid/middleware.rs b/src/vapid/middleware.rs
new file mode 100644
index 0000000..02951ba
--- /dev/null
+++ b/src/vapid/middleware.rs
@@ -0,0 +1,17 @@
+use axum::{
+ extract::{Request, State},
+ middleware::Next,
+ response::Response,
+};
+
+use crate::{app::App, clock::RequestedAt, error::Internal};
+
+pub async fn middleware(
+ State(app): State<App>,
+ RequestedAt(now): RequestedAt,
+ request: Request,
+ next: Next,
+) -> Result<Response, Internal> {
+ app.vapid().refresh_key(&now).await?;
+ Ok(next.run(request).await)
+}
diff --git a/src/vapid/mod.rs b/src/vapid/mod.rs
new file mode 100644
index 0000000..9798654
--- /dev/null
+++ b/src/vapid/mod.rs
@@ -0,0 +1,9 @@
+pub mod app;
+pub mod event;
+mod history;
+mod middleware;
+pub mod repo;
+
+pub use event::Event;
+pub use history::History;
+pub use middleware::middleware;
diff --git a/src/vapid/repo.rs b/src/vapid/repo.rs
new file mode 100644
index 0000000..4ac5286
--- /dev/null
+++ b/src/vapid/repo.rs
@@ -0,0 +1,139 @@
+use p256::{NistP256, ecdsa::SigningKey, elliptic_curve::FieldBytes};
+use sqlx::{Sqlite, SqliteConnection, Transaction};
+
+use super::{
+ History,
+ event::{Changed, Event},
+};
+use crate::{
+ clock::DateTime,
+ db::NotFound,
+ event::{Instant, Sequence},
+};
+
+pub trait Provider {
+ fn vapid(&mut self) -> Vapid<'_>;
+}
+
+impl Provider for Transaction<'_, Sqlite> {
+ fn vapid(&mut self) -> Vapid<'_> {
+ Vapid(self)
+ }
+}
+
+pub struct Vapid<'a>(&'a mut SqliteConnection);
+
+impl Vapid<'_> {
+ pub async fn record_events(
+ &mut self,
+ events: impl IntoIterator<Item = Event>,
+ ) -> Result<(), sqlx::Error> {
+ for event in events {
+ self.record_event(&event).await?;
+ }
+ Ok(())
+ }
+
+ pub async fn record_event(&mut self, event: &Event) -> Result<(), sqlx::Error> {
+ match event {
+ Event::Changed(changed) => self.record_changed(changed).await,
+ }
+ }
+
+ async fn record_changed(&mut self, changed: &Changed) -> Result<(), sqlx::Error> {
+ sqlx::query!(
+ r#"
+ insert into vapid_key (changed_at, changed_sequence)
+ values ($1, $2)
+ "#,
+ changed.instant.at,
+ changed.instant.sequence,
+ )
+ .execute(&mut *self.0)
+ .await?;
+
+ Ok(())
+ }
+
+ pub async fn clear(&mut self) -> Result<(), sqlx::Error> {
+ sqlx::query!(
+ r#"
+ delete from vapid_key
+ "#
+ )
+ .execute(&mut *self.0)
+ .await?;
+
+ sqlx::query!(
+ r#"
+ delete from vapid_signing_key
+ "#
+ )
+ .execute(&mut *self.0)
+ .await?;
+
+ Ok(())
+ }
+
+ pub async fn store_signing_key(&mut self, key: &SigningKey) -> Result<(), Error> {
+ let key = key.to_bytes();
+ let key = key.as_slice();
+ sqlx::query!(
+ r#"
+ insert into vapid_signing_key (key)
+ values ($1)
+ "#,
+ key,
+ )
+ .execute(&mut *self.0)
+ .await?;
+
+ Ok(())
+ }
+
+ pub async fn current(&mut self) -> Result<History, Error> {
+ let key = sqlx::query!(
+ r#"
+ select
+ key.changed_at as "changed_at: DateTime",
+ key.changed_sequence as "changed_sequence: Sequence",
+ signing.key as "key: Vec<u8>"
+ from vapid_key as key
+ join vapid_signing_key as signing
+ "#
+ )
+ .map(|row| {
+ let key = FieldBytes::<NistP256>::from_slice(&row.key);
+ let key = SigningKey::from_bytes(key)?;
+ let key = key.verifying_key().to_owned();
+
+ let changed = Instant::new(row.changed_at, row.changed_sequence);
+
+ Ok::<_, Error>(History { key, changed })
+ })
+ .fetch_one(&mut *self.0)
+ .await??;
+
+ Ok(key)
+ }
+}
+
+#[derive(Debug, thiserror::Error)]
+#[error(transparent)]
+pub enum Error {
+ Ecdsa(#[from] p256::ecdsa::Error),
+ Database(#[from] sqlx::Error),
+}
+
+impl<T> NotFound for Result<T, Error> {
+ type Ok = T;
+ type Error = Error;
+
+ fn optional(self) -> Result<Option<T>, Error> {
+ match self {
+ Ok(value) => Ok(Some(value)),
+ Err(Error::Database(sqlx::Error::RowNotFound)) => Ok(None),
+ Err(other) => Err(other),
+ }
+ }
+}