diff options
Diffstat (limited to 'src/event')
| -rw-r--r-- | src/event/app.rs | 22 | ||||
| -rw-r--r-- | src/event/handlers/stream/test/mod.rs | 1 | ||||
| -rw-r--r-- | src/event/handlers/stream/test/vapid.rs | 111 | ||||
| -rw-r--r-- | src/event/mod.rs | 10 |
4 files changed, 143 insertions, 1 deletions
diff --git a/src/event/app.rs b/src/event/app.rs index 8fa760a..6c657c7 100644 --- a/src/event/app.rs +++ b/src/event/app.rs @@ -8,9 +8,12 @@ use sqlx::sqlite::SqlitePool; use super::{Event, Sequence, Sequenced, broadcaster::Broadcaster}; use crate::{ conversation::{self, repo::Provider as _}, + db::NotFound, message::{self, repo::Provider as _}, name, user::{self, repo::Provider as _}, + vapid, + vapid::repo::Provider as _, }; pub struct Events { @@ -57,9 +60,17 @@ impl Events { .filter(Sequence::after(resume_at)) .map(Event::from); + let vapid = tx.vapid().current().await.optional()?; + let vapid_events = vapid + .iter() + .flat_map(vapid::History::events) + .filter(Sequence::after(resume_at)) + .map(Event::from); + let replay_events = user_events .merge_by(conversation_events, Sequence::merge) .merge_by(message_events, Sequence::merge) + .merge_by(vapid_events, Sequence::merge) .collect::<Vec<_>>(); let resume_live_at = replay_events.last().map_or(resume_at, Sequenced::sequence); @@ -86,6 +97,7 @@ impl Events { pub enum Error { Database(#[from] sqlx::Error), Name(#[from] name::Error), + Ecdsa(#[from] p256::ecdsa::Error), } impl From<user::repo::LoadError> for Error { @@ -107,3 +119,13 @@ impl From<conversation::repo::LoadError> for Error { } } } + +impl From<vapid::repo::Error> for Error { + fn from(error: vapid::repo::Error) -> Self { + use vapid::repo::Error; + match error { + Error::Database(error) => error.into(), + Error::Ecdsa(error) => error.into(), + } + } +} diff --git a/src/event/handlers/stream/test/mod.rs b/src/event/handlers/stream/test/mod.rs index 3bc634f..c3a6ce6 100644 --- a/src/event/handlers/stream/test/mod.rs +++ b/src/event/handlers/stream/test/mod.rs @@ -4,5 +4,6 @@ mod message; mod resume; mod setup; mod token; +mod vapid; use super::{QueryParams, Response, handler}; diff --git a/src/event/handlers/stream/test/vapid.rs b/src/event/handlers/stream/test/vapid.rs new file mode 100644 index 0000000..dbc3929 --- /dev/null +++ b/src/event/handlers/stream/test/vapid.rs @@ -0,0 +1,111 @@ +use axum::extract::State; +use axum_extra::extract::Query; +use futures::StreamExt as _; + +use crate::test::{fixtures, fixtures::future::Expect as _}; + +#[tokio::test] +async fn live_vapid_key_changes() { + // Set up the context + let app = fixtures::scratch_app().await; + let resume_point = fixtures::boot::resume_point(&app).await; + + // Subscribe to events + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let super::Response(events) = super::handler( + State(app.clone()), + subscriber, + None, + Query(super::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); + + // Rotate the VAPID key + + app.vapid() + .refresh_key(&fixtures::now()) + .await + .expect("refreshing the vapid key always succeeds"); + + // Verify that there's a key rotation event + + events + .filter_map(fixtures::event::stream::vapid) + .filter_map(fixtures::event::stream::vapid::changed) + .next() + .expect_some("a vapid key change event is sent") + .await; +} + +#[tokio::test] +async fn stored_vapid_key_changes() { + // Set up the context + let app = fixtures::scratch_app().await; + let resume_point = fixtures::boot::resume_point(&app).await; + + // Rotate the VAPID key + + app.vapid() + .refresh_key(&fixtures::now()) + .await + .expect("refreshing the vapid key always succeeds"); + + // Subscribe to events + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let super::Response(events) = super::handler( + State(app.clone()), + subscriber, + None, + Query(super::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); + + // Verify that there's a key rotation event + + events + .filter_map(fixtures::event::stream::vapid) + .filter_map(fixtures::event::stream::vapid::changed) + .next() + .expect_some("a vapid key change event is sent") + .await; +} + +#[tokio::test] +async fn no_past_vapid_key_changes() { + // Set up the context + let app = fixtures::scratch_app().await; + + // Rotate the VAPID key + + app.vapid() + .refresh_key(&fixtures::now()) + .await + .expect("refreshing the vapid key always succeeds"); + + // Subscribe to events + + let resume_point = fixtures::boot::resume_point(&app).await; + + let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; + let super::Response(events) = super::handler( + State(app.clone()), + subscriber, + None, + Query(super::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); + + // Verify that there's a key rotation event + + events + .filter_map(fixtures::event::stream::vapid) + .filter_map(fixtures::event::stream::vapid::changed) + .next() + .expect_wait("a vapid key change event is not sent") + .await; +} diff --git a/src/event/mod.rs b/src/event/mod.rs index f41dc9c..83b0ce7 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -2,7 +2,7 @@ use std::time::Duration; use axum::response::sse::{self, KeepAlive}; -use crate::{conversation, message, user}; +use crate::{conversation, message, user, vapid}; pub mod app; mod broadcaster; @@ -22,6 +22,7 @@ pub enum Event { User(user::Event), Conversation(conversation::Event), Message(message::Event), + Vapid(vapid::Event), } // Serialized representation is intended to look like the serialized representation of `Event`, @@ -40,6 +41,7 @@ impl Sequenced for Event { Self::User(event) => event.instant(), Self::Conversation(event) => event.instant(), Self::Message(event) => event.instant(), + Self::Vapid(event) => event.instant(), } } } @@ -62,6 +64,12 @@ impl From<message::Event> for Event { } } +impl From<vapid::Event> for Event { + fn from(event: vapid::Event) -> Self { + Self::Vapid(event) + } +} + impl Heartbeat { // The following values are a first-rough-guess attempt to balance noticing connection problems // quickly with managing the (modest) costs of delivering and processing heartbeats. Feel |
