summaryrefslogtreecommitdiff
path: root/src/event
diff options
context:
space:
mode:
Diffstat (limited to 'src/event')
-rw-r--r--src/event/app.rs26
-rw-r--r--src/event/handlers/stream/mod.rs4
-rw-r--r--src/event/handlers/stream/test/mod.rs1
-rw-r--r--src/event/handlers/stream/test/vapid.rs111
-rw-r--r--src/event/mod.rs10
5 files changed, 149 insertions, 3 deletions
diff --git a/src/event/app.rs b/src/event/app.rs
index 8fa760a..e422de9 100644
--- a/src/event/app.rs
+++ b/src/event/app.rs
@@ -8,9 +8,12 @@ use sqlx::sqlite::SqlitePool;
use super::{Event, Sequence, Sequenced, broadcaster::Broadcaster};
use crate::{
conversation::{self, repo::Provider as _},
+ db::NotFound,
message::{self, repo::Provider as _},
name,
user::{self, repo::Provider as _},
+ vapid,
+ vapid::repo::Provider as _,
};
pub struct Events {
@@ -57,9 +60,17 @@ impl Events {
.filter(Sequence::after(resume_at))
.map(Event::from);
+ let vapid = tx.vapid().current().await.optional()?;
+ let vapid_events = vapid
+ .iter()
+ .flat_map(vapid::History::events)
+ .filter(Sequence::after(resume_at))
+ .map(Event::from);
+
let replay_events = user_events
.merge_by(conversation_events, Sequence::merge)
.merge_by(message_events, Sequence::merge)
+ .merge_by(vapid_events, Sequence::merge)
.collect::<Vec<_>>();
let resume_live_at = replay_events.last().map_or(resume_at, Sequenced::sequence);
@@ -86,6 +97,9 @@ impl Events {
pub enum Error {
Database(#[from] sqlx::Error),
Name(#[from] name::Error),
+ Ecdsa(#[from] p256::ecdsa::Error),
+ Pkcs8(#[from] p256::pkcs8::Error),
+ WebPush(#[from] web_push::WebPushError),
}
impl From<user::repo::LoadError> for Error {
@@ -107,3 +121,15 @@ impl From<conversation::repo::LoadError> for Error {
}
}
}
+
+impl From<vapid::repo::Error> for Error {
+ fn from(error: vapid::repo::Error) -> Self {
+ use vapid::repo::Error;
+ match error {
+ Error::Database(error) => error.into(),
+ Error::Ecdsa(error) => error.into(),
+ Error::Pkcs8(error) => error.into(),
+ Error::WebPush(error) => error.into(),
+ }
+ }
+}
diff --git a/src/event/handlers/stream/mod.rs b/src/event/handlers/stream/mod.rs
index 63bfff3..8b89c31 100644
--- a/src/event/handlers/stream/mod.rs
+++ b/src/event/handlers/stream/mod.rs
@@ -18,8 +18,8 @@ use crate::{
#[cfg(test)]
mod test;
-pub async fn handler(
- State(app): State<App>,
+pub async fn handler<P>(
+ State(app): State<App<P>>,
identity: Identity,
last_event_id: Option<LastEventId<Sequence>>,
Query(query): Query<QueryParams>,
diff --git a/src/event/handlers/stream/test/mod.rs b/src/event/handlers/stream/test/mod.rs
index 3bc634f..c3a6ce6 100644
--- a/src/event/handlers/stream/test/mod.rs
+++ b/src/event/handlers/stream/test/mod.rs
@@ -4,5 +4,6 @@ mod message;
mod resume;
mod setup;
mod token;
+mod vapid;
use super::{QueryParams, Response, handler};
diff --git a/src/event/handlers/stream/test/vapid.rs b/src/event/handlers/stream/test/vapid.rs
new file mode 100644
index 0000000..dbc3929
--- /dev/null
+++ b/src/event/handlers/stream/test/vapid.rs
@@ -0,0 +1,111 @@
+use axum::extract::State;
+use axum_extra::extract::Query;
+use futures::StreamExt as _;
+
+use crate::test::{fixtures, fixtures::future::Expect as _};
+
+#[tokio::test]
+async fn live_vapid_key_changes() {
+ // Set up the context
+ let app = fixtures::scratch_app().await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
+
+ // Subscribe to events
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let super::Response(events) = super::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(super::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ // Rotate the VAPID key
+
+ app.vapid()
+ .refresh_key(&fixtures::now())
+ .await
+ .expect("refreshing the vapid key always succeeds");
+
+ // Verify that there's a key rotation event
+
+ events
+ .filter_map(fixtures::event::stream::vapid)
+ .filter_map(fixtures::event::stream::vapid::changed)
+ .next()
+ .expect_some("a vapid key change event is sent")
+ .await;
+}
+
+#[tokio::test]
+async fn stored_vapid_key_changes() {
+ // Set up the context
+ let app = fixtures::scratch_app().await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
+
+ // Rotate the VAPID key
+
+ app.vapid()
+ .refresh_key(&fixtures::now())
+ .await
+ .expect("refreshing the vapid key always succeeds");
+
+ // Subscribe to events
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let super::Response(events) = super::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(super::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ // Verify that there's a key rotation event
+
+ events
+ .filter_map(fixtures::event::stream::vapid)
+ .filter_map(fixtures::event::stream::vapid::changed)
+ .next()
+ .expect_some("a vapid key change event is sent")
+ .await;
+}
+
+#[tokio::test]
+async fn no_past_vapid_key_changes() {
+ // Set up the context
+ let app = fixtures::scratch_app().await;
+
+ // Rotate the VAPID key
+
+ app.vapid()
+ .refresh_key(&fixtures::now())
+ .await
+ .expect("refreshing the vapid key always succeeds");
+
+ // Subscribe to events
+
+ let resume_point = fixtures::boot::resume_point(&app).await;
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let super::Response(events) = super::handler(
+ State(app.clone()),
+ subscriber,
+ None,
+ Query(super::QueryParams { resume_point }),
+ )
+ .await
+ .expect("subscribe never fails");
+
+ // Verify that there's a key rotation event
+
+ events
+ .filter_map(fixtures::event::stream::vapid)
+ .filter_map(fixtures::event::stream::vapid::changed)
+ .next()
+ .expect_wait("a vapid key change event is not sent")
+ .await;
+}
diff --git a/src/event/mod.rs b/src/event/mod.rs
index f41dc9c..83b0ce7 100644
--- a/src/event/mod.rs
+++ b/src/event/mod.rs
@@ -2,7 +2,7 @@ use std::time::Duration;
use axum::response::sse::{self, KeepAlive};
-use crate::{conversation, message, user};
+use crate::{conversation, message, user, vapid};
pub mod app;
mod broadcaster;
@@ -22,6 +22,7 @@ pub enum Event {
User(user::Event),
Conversation(conversation::Event),
Message(message::Event),
+ Vapid(vapid::Event),
}
// Serialized representation is intended to look like the serialized representation of `Event`,
@@ -40,6 +41,7 @@ impl Sequenced for Event {
Self::User(event) => event.instant(),
Self::Conversation(event) => event.instant(),
Self::Message(event) => event.instant(),
+ Self::Vapid(event) => event.instant(),
}
}
}
@@ -62,6 +64,12 @@ impl From<message::Event> for Event {
}
}
+impl From<vapid::Event> for Event {
+ fn from(event: vapid::Event) -> Self {
+ Self::Vapid(event)
+ }
+}
+
impl Heartbeat {
// The following values are a first-rough-guess attempt to balance noticing connection problems
// quickly with managing the (modest) costs of delivering and processing heartbeats. Feel