From eff129bc1f29bcb1b2b9d10c6b49ab886edc83d6 Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Fri, 27 Sep 2024 18:17:02 -0400 Subject: Make `/api/events` a firehose endpoint. It now includes events for all channels. Clients are responsible for filtering. The schema for channel events has changed; it now includes a channel name and ID, in the same format as the sender's name and ID. They also now include a `"type"` field, whose only valid value (as of this writing) is `"message"`. This is groundwork for delivering message deletion (expiry) events to clients, and notifying clients of channel lifecycle events. --- src/test/fixtures/message.rs | 4 ++-- src/test/fixtures/mod.rs | 2 -- 2 files changed, 2 insertions(+), 4 deletions(-) (limited to 'src/test') diff --git a/src/test/fixtures/message.rs b/src/test/fixtures/message.rs index 33feeae..bfca8cd 100644 --- a/src/test/fixtures/message.rs +++ b/src/test/fixtures/message.rs @@ -3,7 +3,7 @@ use faker_rand::lorem::Paragraphs; use crate::{ app::App, clock::RequestedAt, - events::repo::broadcast, + events::types, repo::{channel::Channel, login::Login}, }; @@ -12,7 +12,7 @@ pub async fn send( login: &Login, channel: &Channel, sent_at: &RequestedAt, -) -> broadcast::Message { +) -> types::ChannelEvent { let body = propose(); app.events() diff --git a/src/test/fixtures/mod.rs b/src/test/fixtures/mod.rs index a42dba5..450fbec 100644 --- a/src/test/fixtures/mod.rs +++ b/src/test/fixtures/mod.rs @@ -13,8 +13,6 @@ pub async fn scratch_app() -> App { .await .expect("setting up in-memory sqlite database"); App::from(pool) - .await - .expect("creating an app from a fresh, in-memory database") } pub fn now() -> RequestedAt { -- cgit v1.2.3 From 1458ff7be5d883444943090cb636e9343487d03e Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Fri, 27 Sep 2024 20:18:50 -0400 Subject: Send created events when channels are added. --- ...8c64a38a3f73b112e74b7318ee8e52e475866d8cfd.json | 32 ++++++++++++ ...f4f89221feb030ac8f42cf594c875ecd3bfeca3eb7.json | 26 ---------- ...926096285d50afb88a326cff0ecab96058a2f6d93a.json | 32 ++++++++++++ ...2b3d9939d4edb10e0e654e2a1b19949c3427522a08.json | 26 ---------- ...b3ef57ab9a8bc90dd1104e9900062dccc708e9bd33.json | 32 ++++++++++++ ...d76f0c34c759006d269ccccd6299c66b672076449d.json | 26 ---------- docs/api.md | 10 ++++ migrations/20240928002608_channel_lifecycle.sql | 57 ++++++++++++++++++++++ src/app.rs | 2 +- src/channel/app.rs | 18 +++++-- src/channel/routes.rs | 3 +- src/channel/routes/test/list.rs | 6 +-- src/channel/routes/test/on_create.rs | 47 +++++++++++++----- src/channel/routes/test/on_send.rs | 2 +- src/events/app.rs | 24 ++++++--- src/events/routes/test.rs | 18 +++---- src/events/types.rs | 40 ++++++++++++++- src/repo/channel.rs | 30 +++++++++--- src/test/fixtures/channel.rs | 6 +-- 19 files changed, 309 insertions(+), 128 deletions(-) create mode 100644 .sqlx/query-22f313d9afcdd02df74a8b8c64a38a3f73b112e74b7318ee8e52e475866d8cfd.json delete mode 100644 .sqlx/query-79e6e76bc3f974248bf8a7f4f89221feb030ac8f42cf594c875ecd3bfeca3eb7.json create mode 100644 .sqlx/query-7ccae3dde1aba5f22cf9e3926096285d50afb88a326cff0ecab96058a2f6d93a.json delete mode 100644 .sqlx/query-8c78f7bbfb5522afa15c412b3d9939d4edb10e0e654e2a1b19949c3427522a08.json create mode 100644 .sqlx/query-bddcc014038cc8611bffcbb3ef57ab9a8bc90dd1104e9900062dccc708e9bd33.json delete mode 100644 .sqlx/query-dbe468d2a7f64a45e70dfbd76f0c34c759006d269ccccd6299c66b672076449d.json create mode 100644 migrations/20240928002608_channel_lifecycle.sql (limited to 'src/test') diff --git a/.sqlx/query-22f313d9afcdd02df74a8b8c64a38a3f73b112e74b7318ee8e52e475866d8cfd.json b/.sqlx/query-22f313d9afcdd02df74a8b8c64a38a3f73b112e74b7318ee8e52e475866d8cfd.json new file mode 100644 index 0000000..3d5d06c --- /dev/null +++ b/.sqlx/query-22f313d9afcdd02df74a8b8c64a38a3f73b112e74b7318ee8e52e475866d8cfd.json @@ -0,0 +1,32 @@ +{ + "db_name": "SQLite", + "query": "\n select\n id as \"id: Id\",\n name,\n created_at as \"created_at: DateTime\"\n from channel\n where id = $1\n ", + "describe": { + "columns": [ + { + "name": "id: Id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "name", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "created_at: DateTime", + "ordinal": 2, + "type_info": "Text" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false, + false, + false + ] + }, + "hash": "22f313d9afcdd02df74a8b8c64a38a3f73b112e74b7318ee8e52e475866d8cfd" +} diff --git a/.sqlx/query-79e6e76bc3f974248bf8a7f4f89221feb030ac8f42cf594c875ecd3bfeca3eb7.json b/.sqlx/query-79e6e76bc3f974248bf8a7f4f89221feb030ac8f42cf594c875ecd3bfeca3eb7.json deleted file mode 100644 index b46d940..0000000 --- a/.sqlx/query-79e6e76bc3f974248bf8a7f4f89221feb030ac8f42cf594c875ecd3bfeca3eb7.json +++ /dev/null @@ -1,26 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n select id as \"id: Id\", name\n from channel\n where id = $1\n ", - "describe": { - "columns": [ - { - "name": "id: Id", - "ordinal": 0, - "type_info": "Text" - }, - { - "name": "name", - "ordinal": 1, - "type_info": "Text" - } - ], - "parameters": { - "Right": 1 - }, - "nullable": [ - false, - false - ] - }, - "hash": "79e6e76bc3f974248bf8a7f4f89221feb030ac8f42cf594c875ecd3bfeca3eb7" -} diff --git a/.sqlx/query-7ccae3dde1aba5f22cf9e3926096285d50afb88a326cff0ecab96058a2f6d93a.json b/.sqlx/query-7ccae3dde1aba5f22cf9e3926096285d50afb88a326cff0ecab96058a2f6d93a.json new file mode 100644 index 0000000..4ec7118 --- /dev/null +++ b/.sqlx/query-7ccae3dde1aba5f22cf9e3926096285d50afb88a326cff0ecab96058a2f6d93a.json @@ -0,0 +1,32 @@ +{ + "db_name": "SQLite", + "query": "\n select\n id as \"id: Id\",\n name,\n created_at as \"created_at: DateTime\"\n from channel\n order by channel.name\n ", + "describe": { + "columns": [ + { + "name": "id: Id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "name", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "created_at: DateTime", + "ordinal": 2, + "type_info": "Text" + } + ], + "parameters": { + "Right": 0 + }, + "nullable": [ + false, + false, + false + ] + }, + "hash": "7ccae3dde1aba5f22cf9e3926096285d50afb88a326cff0ecab96058a2f6d93a" +} diff --git a/.sqlx/query-8c78f7bbfb5522afa15c412b3d9939d4edb10e0e654e2a1b19949c3427522a08.json b/.sqlx/query-8c78f7bbfb5522afa15c412b3d9939d4edb10e0e654e2a1b19949c3427522a08.json deleted file mode 100644 index 4d9051d..0000000 --- a/.sqlx/query-8c78f7bbfb5522afa15c412b3d9939d4edb10e0e654e2a1b19949c3427522a08.json +++ /dev/null @@ -1,26 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n select\n channel.id as \"id: Id\",\n channel.name\n from channel\n order by channel.name\n ", - "describe": { - "columns": [ - { - "name": "id: Id", - "ordinal": 0, - "type_info": "Text" - }, - { - "name": "name", - "ordinal": 1, - "type_info": "Text" - } - ], - "parameters": { - "Right": 0 - }, - "nullable": [ - false, - false - ] - }, - "hash": "8c78f7bbfb5522afa15c412b3d9939d4edb10e0e654e2a1b19949c3427522a08" -} diff --git a/.sqlx/query-bddcc014038cc8611bffcbb3ef57ab9a8bc90dd1104e9900062dccc708e9bd33.json b/.sqlx/query-bddcc014038cc8611bffcbb3ef57ab9a8bc90dd1104e9900062dccc708e9bd33.json new file mode 100644 index 0000000..64d56dd --- /dev/null +++ b/.sqlx/query-bddcc014038cc8611bffcbb3ef57ab9a8bc90dd1104e9900062dccc708e9bd33.json @@ -0,0 +1,32 @@ +{ + "db_name": "SQLite", + "query": "\n insert\n into channel (id, name, created_at)\n values ($1, $2, $3)\n returning\n id as \"id: Id\",\n name,\n created_at as \"created_at: DateTime\"\n ", + "describe": { + "columns": [ + { + "name": "id: Id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "name", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "created_at: DateTime", + "ordinal": 2, + "type_info": "Text" + } + ], + "parameters": { + "Right": 3 + }, + "nullable": [ + false, + false, + false + ] + }, + "hash": "bddcc014038cc8611bffcbb3ef57ab9a8bc90dd1104e9900062dccc708e9bd33" +} diff --git a/.sqlx/query-dbe468d2a7f64a45e70dfbd76f0c34c759006d269ccccd6299c66b672076449d.json b/.sqlx/query-dbe468d2a7f64a45e70dfbd76f0c34c759006d269ccccd6299c66b672076449d.json deleted file mode 100644 index 3db94ca..0000000 --- a/.sqlx/query-dbe468d2a7f64a45e70dfbd76f0c34c759006d269ccccd6299c66b672076449d.json +++ /dev/null @@ -1,26 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n insert\n into channel (id, name)\n values ($1, $2)\n returning id as \"id: Id\", name\n ", - "describe": { - "columns": [ - { - "name": "id: Id", - "ordinal": 0, - "type_info": "Text" - }, - { - "name": "name", - "ordinal": 1, - "type_info": "Text" - } - ], - "parameters": { - "Right": 2 - }, - "nullable": [ - false, - false - ] - }, - "hash": "dbe468d2a7f64a45e70dfbd76f0c34c759006d269ccccd6299c66b672076449d" -} diff --git a/docs/api.md b/docs/api.md index 8b31941..c5ee34a 100644 --- a/docs/api.md +++ b/docs/api.md @@ -165,6 +165,16 @@ The event IDs `hi` sends in `application/event-stream` encoding are ephemeral, a The returned event stream is a sequence of events: ```json +id: 1233 +data: { +data: "type": "created" +data: "at": "2024-09-27T23:18:10.208147Z", +data: "channel": { +data: "id": "C9876cyyz", +data: "name": "example channel 2" +data: }, +data: } + id: 1234 data: { data: "type": "message", diff --git a/migrations/20240928002608_channel_lifecycle.sql b/migrations/20240928002608_channel_lifecycle.sql new file mode 100644 index 0000000..bc690d7 --- /dev/null +++ b/migrations/20240928002608_channel_lifecycle.sql @@ -0,0 +1,57 @@ +alter table channel +rename to old_channel; + +-- Add new columns +create table channel ( + id text + not null + primary key, + name text + not null + unique, + created_at text + not null +); + +-- Transfer data from original table +insert into channel +select + channel.id, + channel.name, + coalesce( + min(message.sent_at), + strftime('%FT%R:%f+00:00', 'now', 'utc') + ) as created_at +from old_channel as channel + left join message on channel.id = message.channel +group by channel.id, channel.name; + +-- Fix up `message` foreign keys +alter table message +rename to old_message; + +create table message ( + id text + not null + primary key, + sequence bigint + not null, + channel text + not null + references channel (id), + sender text + not null + references login (id), + body text + not null, + sent_at text + not null, + unique (channel, sequence) +); + +insert into message +select * from old_message; + +-- Bury the bodies respectfully +drop table old_message; +drop table old_channel; diff --git a/src/app.rs b/src/app.rs index 07b932a..245feb1 100644 --- a/src/app.rs +++ b/src/app.rs @@ -29,6 +29,6 @@ impl App { } pub const fn channels(&self) -> Channels { - Channels::new(&self.db) + Channels::new(&self.db, &self.broadcaster) } } diff --git a/src/channel/app.rs b/src/channel/app.rs index 6bad158..1eeca79 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -1,25 +1,33 @@ use sqlx::sqlite::SqlitePool; -use crate::repo::channel::{Channel, Provider as _}; +use crate::{ + clock::DateTime, + events::{broadcaster::Broadcaster, types::ChannelEvent}, + repo::channel::{Channel, Provider as _}, +}; pub struct Channels<'a> { db: &'a SqlitePool, + broadcaster: &'a Broadcaster, } impl<'a> Channels<'a> { - pub const fn new(db: &'a SqlitePool) -> Self { - Self { db } + pub const fn new(db: &'a SqlitePool, broadcaster: &'a Broadcaster) -> Self { + Self { db, broadcaster } } - pub async fn create(&self, name: &str) -> Result { + pub async fn create(&self, name: &str, created_at: &DateTime) -> Result { let mut tx = self.db.begin().await?; let channel = tx .channels() - .create(name) + .create(name, created_at) .await .map_err(|err| CreateError::from_duplicate_name(err, name))?; tx.commit().await?; + self.broadcaster + .broadcast(&ChannelEvent::created(channel.clone())); + Ok(channel) } diff --git a/src/channel/routes.rs b/src/channel/routes.rs index f524e62..1f8db5a 100644 --- a/src/channel/routes.rs +++ b/src/channel/routes.rs @@ -52,11 +52,12 @@ struct CreateRequest { async fn on_create( State(app): State, _: Login, // requires auth, but doesn't actually care who you are + RequestedAt(created_at): RequestedAt, Json(form): Json, ) -> Result, CreateError> { let channel = app .channels() - .create(&form.name) + .create(&form.name, &created_at) .await .map_err(CreateError)?; diff --git a/src/channel/routes/test/list.rs b/src/channel/routes/test/list.rs index f7f7b44..bc94024 100644 --- a/src/channel/routes/test/list.rs +++ b/src/channel/routes/test/list.rs @@ -26,7 +26,7 @@ async fn one_channel() { let app = fixtures::scratch_app().await; let viewer = fixtures::login::create(&app).await; - let channel = fixtures::channel::create(&app).await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; // Call the endpoint @@ -46,8 +46,8 @@ async fn multiple_channels() { let app = fixtures::scratch_app().await; let viewer = fixtures::login::create(&app).await; let channels = vec![ - fixtures::channel::create(&app).await, - fixtures::channel::create(&app).await, + fixtures::channel::create(&app, &fixtures::now()).await, + fixtures::channel::create(&app, &fixtures::now()).await, ]; // Call the endpoint diff --git a/src/channel/routes/test/on_create.rs b/src/channel/routes/test/on_create.rs index 23885c0..bb6697f 100644 --- a/src/channel/routes/test/on_create.rs +++ b/src/channel/routes/test/on_create.rs @@ -1,8 +1,10 @@ use axum::extract::{Json, State}; +use futures::{future, stream::StreamExt as _}; use crate::{ channel::{app, routes}, - test::fixtures, + events::types, + test::fixtures::{self, future::Immediately as _}, }; #[tokio::test] @@ -16,10 +18,14 @@ async fn new_channel() { let name = fixtures::channel::propose(); let request = routes::CreateRequest { name }; - let Json(response_channel) = - routes::on_create(State(app.clone()), creator, Json(request.clone())) - .await - .expect("new channel in an empty app"); + let Json(response_channel) = routes::on_create( + State(app.clone()), + creator, + fixtures::now(), + Json(request.clone()), + ) + .await + .expect("new channel in an empty app"); // Verify the structure of the response @@ -28,8 +34,23 @@ async fn new_channel() { // Verify the semantics let channels = app.channels().all().await.expect("always succeeds"); - assert!(channels.contains(&response_channel)); + + let mut events = app + .events() + .subscribe(&fixtures::now(), types::ResumePoint::default()) + .await + .expect("subscribing never fails") + .filter(|types::ResumableEvent(_, event)| future::ready(event.channel == response_channel)); + + let types::ResumableEvent(_, event) = events + .next() + .immediately() + .await + .expect("creation event published"); + + assert_eq!(types::Sequence::default(), event.sequence); + assert_eq!(types::ChannelEventData::Created, event.data); } #[tokio::test] @@ -38,15 +59,19 @@ async fn duplicate_name() { let app = fixtures::scratch_app().await; let creator = fixtures::login::create(&app).await; - let channel = fixtures::channel::create(&app).await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; // Call the endpoint let request = routes::CreateRequest { name: channel.name }; - let routes::CreateError(error) = - routes::on_create(State(app.clone()), creator, Json(request.clone())) - .await - .expect_err("duplicate channel name"); + let routes::CreateError(error) = routes::on_create( + State(app.clone()), + creator, + fixtures::now(), + Json(request.clone()), + ) + .await + .expect_err("duplicate channel name"); // Verify the structure of the response diff --git a/src/channel/routes/test/on_send.rs b/src/channel/routes/test/on_send.rs index 5d87bdc..e4de0f1 100644 --- a/src/channel/routes/test/on_send.rs +++ b/src/channel/routes/test/on_send.rs @@ -14,7 +14,7 @@ async fn messages_in_order() { let app = fixtures::scratch_app().await; let sender = fixtures::login::create(&app).await; - let channel = fixtures::channel::create(&app).await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; // Call the endpoint (twice) diff --git a/src/events/app.rs b/src/events/app.rs index 043a29b..134e86a 100644 --- a/src/events/app.rs +++ b/src/events/app.rs @@ -11,7 +11,7 @@ use sqlx::sqlite::SqlitePool; use super::{ broadcaster::Broadcaster, repo::message::Provider as _, - types::{self, ResumePoint}, + types::{self, ChannelEvent, ResumePoint}, }; use crate::{ clock::DateTime, @@ -66,6 +66,17 @@ impl<'a> Events<'a> { let mut tx = self.db.begin().await?; let channels = tx.channels().all().await?; + let created_events = { + let resume_at = resume_at.clone(); + let channels = channels.clone(); + stream::iter( + channels + .into_iter() + .map(ChannelEvent::created) + .filter(move |event| resume_at.not_after(event)), + ) + }; + // Subscribe before retrieving, to catch messages broadcast while we're // querying the DB. We'll prune out duplicates later. let live_messages = self.broadcaster.subscribe(); @@ -104,9 +115,9 @@ impl<'a> Events<'a> { // stored_messages. .filter(Self::resume(resume_live_at)); - Ok(replay - .chain(live_messages) - .scan(resume_at, |resume_point, event| { + Ok(created_events.chain(replay).chain(live_messages).scan( + resume_at, + |resume_point, event| { let channel = &event.channel.id; let sequence = event.sequence; resume_point.advance(channel, sequence); @@ -114,13 +125,14 @@ impl<'a> Events<'a> { let event = types::ResumableEvent(resume_point.clone(), event); future::ready(Some(event)) - })) + }, + )) } fn resume( resume_at: ResumePoint, ) -> impl for<'m> FnMut(&'m types::ChannelEvent) -> future::Ready { - move |event| future::ready(resume_at < event.sequence()) + move |event| future::ready(resume_at.not_after(event)) } fn skip_expired( expire_at: &DateTime, diff --git a/src/events/routes/test.rs b/src/events/routes/test.rs index f289225..55ada95 100644 --- a/src/events/routes/test.rs +++ b/src/events/routes/test.rs @@ -15,7 +15,7 @@ async fn includes_historical_message() { let app = fixtures::scratch_app().await; let sender = fixtures::login::create(&app).await; - let channel = fixtures::channel::create(&app).await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await; // Call the endpoint @@ -42,7 +42,7 @@ async fn includes_live_message() { // Set up the environment let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app).await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; // Call the endpoint @@ -75,8 +75,8 @@ async fn includes_multiple_channels() { let sender = fixtures::login::create(&app).await; let channels = [ - fixtures::channel::create(&app).await, - fixtures::channel::create(&app).await, + fixtures::channel::create(&app, &fixtures::now()).await, + fixtures::channel::create(&app, &fixtures::now()).await, ]; let messages = stream::iter(channels) @@ -117,7 +117,7 @@ async fn sequential_messages() { // Set up the environment let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app).await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; let sender = fixtures::login::create(&app).await; let messages = vec![ @@ -156,7 +156,7 @@ async fn resumes_from() { // Set up the environment let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app).await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; let sender = fixtures::login::create(&app).await; let initial_message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await; @@ -229,8 +229,8 @@ async fn serial_resume() { let app = fixtures::scratch_app().await; let sender = fixtures::login::create(&app).await; - let channel_a = fixtures::channel::create(&app).await; - let channel_b = fixtures::channel::create(&app).await; + let channel_a = fixtures::channel::create(&app, &fixtures::now()).await; + let channel_b = fixtures::channel::create(&app, &fixtures::now()).await; // Call the endpoint @@ -346,7 +346,7 @@ async fn removes_expired_messages() { // Set up the environment let app = fixtures::scratch_app().await; let sender = fixtures::login::create(&app).await; - let channel = fixtures::channel::create(&app).await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; fixtures::message::send(&app, &sender, &channel, &fixtures::ancient()).await; let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await; diff --git a/src/events/types.rs b/src/events/types.rs index 6747afc..7c0e0a4 100644 --- a/src/events/types.rs +++ b/src/events/types.rs @@ -11,6 +11,7 @@ use crate::{ #[derive( Debug, + Default, Eq, Ord, PartialEq, @@ -59,7 +60,30 @@ impl ResumePoint { let Self(elements) = self; elements.get(channel).copied() } + + pub fn not_after(&self, event: impl ResumeElement) -> bool { + let Self(elements) = self; + let (channel, sequence) = event.element(); + + elements + .get(channel) + .map_or(true, |resume_at| resume_at < &sequence) + } } + +pub trait ResumeElement { + fn element(&self) -> (&channel::Id, Sequence); +} + +impl ResumeElement for &T +where + T: ResumeElement, +{ + fn element(&self) -> (&channel::Id, Sequence) { + (*self).element() + } +} + #[derive(Clone, Debug)] pub struct ResumableEvent(pub ResumePoint, pub ChannelEvent); @@ -74,14 +98,26 @@ pub struct ChannelEvent { } impl ChannelEvent { - pub fn sequence(&self) -> ResumePoint { - ResumePoint::singleton(&self.channel.id, self.sequence) + pub fn created(channel: Channel) -> Self { + Self { + at: channel.created_at, + sequence: Sequence::default(), + channel, + data: ChannelEventData::Created, + } + } +} + +impl ResumeElement for ChannelEvent { + fn element(&self) -> (&channel::Id, Sequence) { + (&self.channel.id, self.sequence) } } #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] #[serde(tag = "type", rename_all = "snake_case")] pub enum ChannelEventData { + Created, Message(MessageEvent), } diff --git a/src/repo/channel.rs b/src/repo/channel.rs index d223dab..e85b898 100644 --- a/src/repo/channel.rs +++ b/src/repo/channel.rs @@ -2,7 +2,7 @@ use std::fmt; use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; -use crate::id::Id as BaseId; +use crate::{clock::DateTime, id::Id as BaseId}; pub trait Provider { fn channels(&mut self) -> Channels; @@ -20,22 +20,32 @@ pub struct Channels<'t>(&'t mut SqliteConnection); pub struct Channel { pub id: Id, pub name: String, + #[serde(skip)] + pub created_at: DateTime, } impl<'c> Channels<'c> { - pub async fn create(&mut self, name: &str) -> Result { + pub async fn create( + &mut self, + name: &str, + created_at: &DateTime, + ) -> Result { let id = Id::generate(); let channel = sqlx::query_as!( Channel, r#" insert - into channel (id, name) - values ($1, $2) - returning id as "id: Id", name + into channel (id, name, created_at) + values ($1, $2, $3) + returning + id as "id: Id", + name, + created_at as "created_at: DateTime" "#, id, name, + created_at, ) .fetch_one(&mut *self.0) .await?; @@ -47,7 +57,10 @@ impl<'c> Channels<'c> { let channel = sqlx::query_as!( Channel, r#" - select id as "id: Id", name + select + id as "id: Id", + name, + created_at as "created_at: DateTime" from channel where id = $1 "#, @@ -64,8 +77,9 @@ impl<'c> Channels<'c> { Channel, r#" select - channel.id as "id: Id", - channel.name + id as "id: Id", + name, + created_at as "created_at: DateTime" from channel order by channel.name "#, diff --git a/src/test/fixtures/channel.rs b/src/test/fixtures/channel.rs index 0558395..8744470 100644 --- a/src/test/fixtures/channel.rs +++ b/src/test/fixtures/channel.rs @@ -4,12 +4,12 @@ use faker_rand::{ }; use rand; -use crate::{app::App, repo::channel::Channel}; +use crate::{app::App, clock::RequestedAt, repo::channel::Channel}; -pub async fn create(app: &App) -> Channel { +pub async fn create(app: &App, created_at: &RequestedAt) -> Channel { let name = propose(); app.channels() - .create(&name) + .create(&name, created_at) .await .expect("should always succeed if the channel is actually new") } -- cgit v1.2.3 From 60b711c844f8624348d5d1dac3a625532a8e2a82 Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Fri, 27 Sep 2024 23:03:46 -0400 Subject: Delete expired messages out of band. Trying to reliably do expiry mid-request was causing some anomalies: * Creating a channel with a dup name would fail, then succeed after listing channels. It was very hard to reason about which operations needed to trigger expiry, to fix this "correctly," so now expiry runs on every request. --- ...85eb71f113b18d581b50d2c4c7d0462438a71e5f1d.json | 12 --- ...e61c4b10c6f90b1221e963db69c8e6d23e99012ecf.json | 32 ++++++++ ...b3ef57ab9a8bc90dd1104e9900062dccc708e9bd33.json | 32 -------- ...a2d368676f279af866d0840d6c2c093b87b1eadd8c.json | 38 +++++++++ ...76cdd9450fd1e8b4f2425cfda141d72fd94d3c39f9.json | 20 +++++ src/channel/routes/test/on_create.rs | 2 +- src/channel/routes/test/on_send.rs | 4 +- src/cli.rs | 4 + src/events/app.rs | 40 ++++++---- src/events/expire.rs | 18 +++++ src/events/mod.rs | 1 + src/events/repo/message.rs | 70 ++++++++++++++--- src/events/routes.rs | 5 +- src/events/routes/test.rs | 91 +++++++--------------- src/events/types.rs | 12 +++ src/repo/channel.rs | 8 +- src/test/fixtures/filter.rs | 9 +++ src/test/fixtures/mod.rs | 1 + 18 files changed, 258 insertions(+), 141 deletions(-) delete mode 100644 .sqlx/query-61ebab676719aed700404185eb71f113b18d581b50d2c4c7d0462438a71e5f1d.json create mode 100644 .sqlx/query-aeafe536f36593bfd1080ee61c4b10c6f90b1221e963db69c8e6d23e99012ecf.json delete mode 100644 .sqlx/query-bddcc014038cc8611bffcbb3ef57ab9a8bc90dd1104e9900062dccc708e9bd33.json create mode 100644 .sqlx/query-df3656771c3cb6851e0c54a2d368676f279af866d0840d6c2c093b87b1eadd8c.json create mode 100644 .sqlx/query-f5d5b3ec3554a80230e29676cdd9450fd1e8b4f2425cfda141d72fd94d3c39f9.json create mode 100644 src/events/expire.rs create mode 100644 src/test/fixtures/filter.rs (limited to 'src/test') diff --git a/.sqlx/query-61ebab676719aed700404185eb71f113b18d581b50d2c4c7d0462438a71e5f1d.json b/.sqlx/query-61ebab676719aed700404185eb71f113b18d581b50d2c4c7d0462438a71e5f1d.json deleted file mode 100644 index 9edc1af..0000000 --- a/.sqlx/query-61ebab676719aed700404185eb71f113b18d581b50d2c4c7d0462438a71e5f1d.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n delete from message\n where sent_at < $1\n ", - "describe": { - "columns": [], - "parameters": { - "Right": 1 - }, - "nullable": [] - }, - "hash": "61ebab676719aed700404185eb71f113b18d581b50d2c4c7d0462438a71e5f1d" -} diff --git a/.sqlx/query-aeafe536f36593bfd1080ee61c4b10c6f90b1221e963db69c8e6d23e99012ecf.json b/.sqlx/query-aeafe536f36593bfd1080ee61c4b10c6f90b1221e963db69c8e6d23e99012ecf.json new file mode 100644 index 0000000..5c27826 --- /dev/null +++ b/.sqlx/query-aeafe536f36593bfd1080ee61c4b10c6f90b1221e963db69c8e6d23e99012ecf.json @@ -0,0 +1,32 @@ +{ + "db_name": "SQLite", + "query": "\n insert\n into channel (id, name, created_at, last_sequence)\n values ($1, $2, $3, $4)\n returning\n id as \"id: Id\",\n name,\n created_at as \"created_at: DateTime\"\n ", + "describe": { + "columns": [ + { + "name": "id: Id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "name", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "created_at: DateTime", + "ordinal": 2, + "type_info": "Text" + } + ], + "parameters": { + "Right": 4 + }, + "nullable": [ + false, + false, + false + ] + }, + "hash": "aeafe536f36593bfd1080ee61c4b10c6f90b1221e963db69c8e6d23e99012ecf" +} diff --git a/.sqlx/query-bddcc014038cc8611bffcbb3ef57ab9a8bc90dd1104e9900062dccc708e9bd33.json b/.sqlx/query-bddcc014038cc8611bffcbb3ef57ab9a8bc90dd1104e9900062dccc708e9bd33.json deleted file mode 100644 index 64d56dd..0000000 --- a/.sqlx/query-bddcc014038cc8611bffcbb3ef57ab9a8bc90dd1104e9900062dccc708e9bd33.json +++ /dev/null @@ -1,32 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n insert\n into channel (id, name, created_at)\n values ($1, $2, $3)\n returning\n id as \"id: Id\",\n name,\n created_at as \"created_at: DateTime\"\n ", - "describe": { - "columns": [ - { - "name": "id: Id", - "ordinal": 0, - "type_info": "Text" - }, - { - "name": "name", - "ordinal": 1, - "type_info": "Text" - }, - { - "name": "created_at: DateTime", - "ordinal": 2, - "type_info": "Text" - } - ], - "parameters": { - "Right": 3 - }, - "nullable": [ - false, - false, - false - ] - }, - "hash": "bddcc014038cc8611bffcbb3ef57ab9a8bc90dd1104e9900062dccc708e9bd33" -} diff --git a/.sqlx/query-df3656771c3cb6851e0c54a2d368676f279af866d0840d6c2c093b87b1eadd8c.json b/.sqlx/query-df3656771c3cb6851e0c54a2d368676f279af866d0840d6c2c093b87b1eadd8c.json new file mode 100644 index 0000000..87e478e --- /dev/null +++ b/.sqlx/query-df3656771c3cb6851e0c54a2d368676f279af866d0840d6c2c093b87b1eadd8c.json @@ -0,0 +1,38 @@ +{ + "db_name": "SQLite", + "query": "\n select\n channel.id as \"channel_id: channel::Id\",\n channel.name as \"channel_name\",\n channel.created_at as \"channel_created_at: DateTime\",\n message.id as \"message: message::Id\"\n from message\n join channel on message.channel = channel.id\n join login as sender on message.sender = sender.id\n where sent_at < $1\n ", + "describe": { + "columns": [ + { + "name": "channel_id: channel::Id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "channel_name", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "channel_created_at: DateTime", + "ordinal": 2, + "type_info": "Text" + }, + { + "name": "message: message::Id", + "ordinal": 3, + "type_info": "Text" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false, + false, + false, + false + ] + }, + "hash": "df3656771c3cb6851e0c54a2d368676f279af866d0840d6c2c093b87b1eadd8c" +} diff --git a/.sqlx/query-f5d5b3ec3554a80230e29676cdd9450fd1e8b4f2425cfda141d72fd94d3c39f9.json b/.sqlx/query-f5d5b3ec3554a80230e29676cdd9450fd1e8b4f2425cfda141d72fd94d3c39f9.json new file mode 100644 index 0000000..7b1d2d8 --- /dev/null +++ b/.sqlx/query-f5d5b3ec3554a80230e29676cdd9450fd1e8b4f2425cfda141d72fd94d3c39f9.json @@ -0,0 +1,20 @@ +{ + "db_name": "SQLite", + "query": "\n delete from message\n where id = $1\n returning 1 as \"row: i64\"\n ", + "describe": { + "columns": [ + { + "name": "row: i64", + "ordinal": 0, + "type_info": "Null" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + null + ] + }, + "hash": "f5d5b3ec3554a80230e29676cdd9450fd1e8b4f2425cfda141d72fd94d3c39f9" +} diff --git a/src/channel/routes/test/on_create.rs b/src/channel/routes/test/on_create.rs index bb6697f..5e62d7f 100644 --- a/src/channel/routes/test/on_create.rs +++ b/src/channel/routes/test/on_create.rs @@ -38,7 +38,7 @@ async fn new_channel() { let mut events = app .events() - .subscribe(&fixtures::now(), types::ResumePoint::default()) + .subscribe(types::ResumePoint::default()) .await .expect("subscribing never fails") .filter(|types::ResumableEvent(_, event)| future::ready(event.channel == response_channel)); diff --git a/src/channel/routes/test/on_send.rs b/src/channel/routes/test/on_send.rs index 20ae016..233518b 100644 --- a/src/channel/routes/test/on_send.rs +++ b/src/channel/routes/test/on_send.rs @@ -41,12 +41,12 @@ async fn messages_in_order() { // Verify the semantics - let subscribed_at = fixtures::now(); let events = app .events() - .subscribe(&subscribed_at, types::ResumePoint::default()) + .subscribe(types::ResumePoint::default()) .await .expect("subscribing to a valid channel") + .filter(fixtures::filter::messages()) .take(requests.len()); let events = events.collect::>().immediately().await; diff --git a/src/cli.rs b/src/cli.rs index a6d752c..472d68f 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -72,6 +72,10 @@ impl Args { let app = App::from(pool); let app = routers() + .route_layer(middleware::from_fn_with_state( + app.clone(), + events::expire::middleware, + )) .route_layer(middleware::from_fn(clock::middleware)) .with_state(app); diff --git a/src/events/app.rs b/src/events/app.rs index 134e86a..03f3ee6 100644 --- a/src/events/app.rs +++ b/src/events/app.rs @@ -55,14 +55,35 @@ impl<'a> Events<'a> { Ok(event) } + pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { + // Somewhat arbitrarily, expire after 90 days. + let expire_at = relative_to.to_owned() - TimeDelta::days(90); + + let mut tx = self.db.begin().await?; + let expired = tx.message_events().expired(&expire_at).await?; + + let mut events = Vec::with_capacity(expired.len()); + for (channel, message) in expired { + let event = tx + .message_events() + .delete_expired(&channel, &message, relative_to) + .await?; + events.push(event); + } + + tx.commit().await?; + + for event in events { + self.broadcaster.broadcast(&event); + } + + Ok(()) + } + pub async fn subscribe( &self, - subscribed_at: &DateTime, resume_at: ResumePoint, ) -> Result + std::fmt::Debug, sqlx::Error> { - // Somewhat arbitrarily, expire after 90 days. - let expire_at = subscribed_at.to_owned() - TimeDelta::days(90); - let mut tx = self.db.begin().await?; let channels = tx.channels().all().await?; @@ -81,8 +102,6 @@ impl<'a> Events<'a> { // querying the DB. We'll prune out duplicates later. let live_messages = self.broadcaster.subscribe(); - tx.message_events().expire(&expire_at).await?; - let mut replays = BTreeMap::new(); let mut resume_live_at = resume_at.clone(); for channel in channels { @@ -107,9 +126,6 @@ impl<'a> Events<'a> { // * resume is redundant with the resume_at argument to // `tx.broadcasts().replay(…)`. let live_messages = live_messages - // Sure, it's temporally improbable that we'll ever skip a message - // that's 90 days old, but there's no reason not to be thorough. - .filter(Self::skip_expired(&expire_at)) // Filtering on the broadcast resume point filters out messages // before resume_at, and filters out messages duplicated from // stored_messages. @@ -134,12 +150,6 @@ impl<'a> Events<'a> { ) -> impl for<'m> FnMut(&'m types::ChannelEvent) -> future::Ready { move |event| future::ready(resume_at.not_after(event)) } - fn skip_expired( - expire_at: &DateTime, - ) -> impl for<'m> FnMut(&'m types::ChannelEvent) -> future::Ready { - let expire_at = expire_at.to_owned(); - move |event| future::ready(expire_at < event.at) - } } #[derive(Debug, thiserror::Error)] diff --git a/src/events/expire.rs b/src/events/expire.rs new file mode 100644 index 0000000..d92142d --- /dev/null +++ b/src/events/expire.rs @@ -0,0 +1,18 @@ +use axum::{ + extract::{Request, State}, + middleware::Next, + response::Response, +}; + +use crate::{app::App, clock::RequestedAt, error::Internal}; + +// Expires messages and channels before each request. +pub async fn middleware( + State(app): State, + RequestedAt(expired_at): RequestedAt, + req: Request, + next: Next, +) -> Result { + app.events().expire(&expired_at).await?; + Ok(next.run(req).await) +} diff --git a/src/events/mod.rs b/src/events/mod.rs index 711ae64..86bc5e9 100644 --- a/src/events/mod.rs +++ b/src/events/mod.rs @@ -1,5 +1,6 @@ pub mod app; pub mod broadcaster; +pub mod expire; mod extract; pub mod repo; mod routes; diff --git a/src/events/repo/message.rs b/src/events/repo/message.rs index f6fce0e..32419d5 100644 --- a/src/events/repo/message.rs +++ b/src/events/repo/message.rs @@ -6,7 +6,7 @@ use crate::{ repo::{ channel::{self, Channel}, login::{self, Login}, - message, + message::{self, Message}, }, }; @@ -30,7 +30,7 @@ impl<'c> Events<'c> { body: &str, sent_at: &DateTime, ) -> Result { - let sequence = self.assign_sequence(&channel.id).await?; + let sequence = self.assign_sequence(channel).await?; let id = message::Id::generate(); @@ -59,7 +59,7 @@ impl<'c> Events<'c> { channel: channel.clone(), data: types::MessageEvent { sender: sender.clone(), - message: message::Message { + message: Message { id: row.id, body: row.body, }, @@ -72,7 +72,7 @@ impl<'c> Events<'c> { Ok(message) } - async fn assign_sequence(&mut self, channel: &channel::Id) -> Result { + async fn assign_sequence(&mut self, channel: &Channel) -> Result { let next = sqlx::query_scalar!( r#" update channel @@ -80,7 +80,7 @@ impl<'c> Events<'c> { where id = $1 returning last_sequence as "next_sequence: Sequence" "#, - channel, + channel.id, ) .fetch_one(&mut *self.0) .await?; @@ -88,18 +88,68 @@ impl<'c> Events<'c> { Ok(next) } - pub async fn expire(&mut self, expire_at: &DateTime) -> Result<(), sqlx::Error> { - sqlx::query!( + pub async fn delete_expired( + &mut self, + channel: &Channel, + message: &message::Id, + deleted_at: &DateTime, + ) -> Result { + let sequence = self.assign_sequence(channel).await?; + + sqlx::query_scalar!( r#" delete from message + where id = $1 + returning 1 as "row: i64" + "#, + message, + ) + .fetch_one(&mut *self.0) + .await?; + + Ok(types::ChannelEvent { + sequence, + at: *deleted_at, + channel: channel.clone(), + data: types::MessageDeletedEvent { + message: message.clone(), + } + .into(), + }) + } + + pub async fn expired( + &mut self, + expire_at: &DateTime, + ) -> Result, sqlx::Error> { + let messages = sqlx::query!( + r#" + select + channel.id as "channel_id: channel::Id", + channel.name as "channel_name", + channel.created_at as "channel_created_at: DateTime", + message.id as "message: message::Id" + from message + join channel on message.channel = channel.id + join login as sender on message.sender = sender.id where sent_at < $1 "#, expire_at, ) - .execute(&mut *self.0) + .map(|row| { + ( + Channel { + id: row.channel_id, + name: row.channel_name, + created_at: row.channel_created_at, + }, + row.message, + ) + }) + .fetch_all(&mut *self.0) .await?; - Ok(()) + Ok(messages) } pub async fn replay( @@ -134,7 +184,7 @@ impl<'c> Events<'c> { id: row.sender_id, name: row.sender_name, }, - message: message::Message { + message: Message { id: row.id, body: row.body, }, diff --git a/src/events/routes.rs b/src/events/routes.rs index 3f70dcd..89c942c 100644 --- a/src/events/routes.rs +++ b/src/events/routes.rs @@ -13,7 +13,7 @@ use super::{ extract::LastEventId, types::{self, ResumePoint}, }; -use crate::{app::App, clock::RequestedAt, error::Internal, repo::login::Login}; +use crate::{app::App, error::Internal, repo::login::Login}; #[cfg(test)] mod test; @@ -24,7 +24,6 @@ pub fn router() -> Router { async fn events( State(app): State, - RequestedAt(subscribed_at): RequestedAt, _: Login, // requires auth, but doesn't actually care who you are last_event_id: Option>, ) -> Result + std::fmt::Debug>, Internal> { @@ -32,7 +31,7 @@ async fn events( .map(LastEventId::into_inner) .unwrap_or_default(); - let stream = app.events().subscribe(&subscribed_at, resume_at).await?; + let stream = app.events().subscribe(resume_at).await?; Ok(Events(stream)) } diff --git a/src/events/routes/test.rs b/src/events/routes/test.rs index 55ada95..a6e2275 100644 --- a/src/events/routes/test.rs +++ b/src/events/routes/test.rs @@ -21,14 +21,14 @@ async fn includes_historical_message() { // Call the endpoint let subscriber = fixtures::login::create(&app).await; - let subscribed_at = fixtures::now(); - let routes::Events(mut events) = routes::events(State(app), subscribed_at, subscriber, None) + let routes::Events(events) = routes::events(State(app), subscriber, None) .await .expect("subscribe never fails"); // Verify the structure of the response. let types::ResumableEvent(_, event) = events + .filter(fixtures::filter::messages()) .next() .immediately() .await @@ -47,11 +47,9 @@ async fn includes_live_message() { // Call the endpoint let subscriber = fixtures::login::create(&app).await; - let subscribed_at = fixtures::now(); - let routes::Events(mut events) = - routes::events(State(app.clone()), subscribed_at, subscriber, None) - .await - .expect("subscribe never fails"); + let routes::Events(events) = routes::events(State(app.clone()), subscriber, None) + .await + .expect("subscribe never fails"); // Verify the semantics @@ -59,6 +57,7 @@ async fn includes_live_message() { let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await; let types::ResumableEvent(_, event) = events + .filter(fixtures::filter::messages()) .next() .immediately() .await @@ -92,14 +91,14 @@ async fn includes_multiple_channels() { // Call the endpoint let subscriber = fixtures::login::create(&app).await; - let subscribed_at = fixtures::now(); - let routes::Events(events) = routes::events(State(app), subscribed_at, subscriber, None) + let routes::Events(events) = routes::events(State(app), subscriber, None) .await .expect("subscribe never fails"); // Verify the structure of the response. let events = events + .filter(fixtures::filter::messages()) .take(messages.len()) .collect::>() .immediately() @@ -129,8 +128,7 @@ async fn sequential_messages() { // Call the endpoint let subscriber = fixtures::login::create(&app).await; - let subscribed_at = fixtures::now(); - let routes::Events(events) = routes::events(State(app), subscribed_at, subscriber, None) + let routes::Events(events) = routes::events(State(app), subscriber, None) .await .expect("subscribe never fails"); @@ -169,17 +167,19 @@ async fn resumes_from() { // Call the endpoint let subscriber = fixtures::login::create(&app).await; - let subscribed_at = fixtures::now(); let resume_at = { // First subscription - let routes::Events(mut events) = - routes::events(State(app.clone()), subscribed_at, subscriber.clone(), None) - .await - .expect("subscribe never fails"); + let routes::Events(events) = routes::events(State(app.clone()), subscriber.clone(), None) + .await + .expect("subscribe never fails"); - let types::ResumableEvent(last_event_id, event) = - events.next().immediately().await.expect("delivered events"); + let types::ResumableEvent(last_event_id, event) = events + .filter(fixtures::filter::messages()) + .next() + .immediately() + .await + .expect("delivered events"); assert_eq!(initial_message, event); @@ -187,11 +187,9 @@ async fn resumes_from() { }; // Resume after disconnect - let reconnect_at = fixtures::now(); - let routes::Events(resumed) = - routes::events(State(app), reconnect_at, subscriber, Some(resume_at.into())) - .await - .expect("subscribe never fails"); + let routes::Events(resumed) = routes::events(State(app), subscriber, Some(resume_at.into())) + .await + .expect("subscribe never fails"); // Verify the structure of the response. @@ -243,13 +241,12 @@ async fn serial_resume() { ]; // First subscription - let subscribed_at = fixtures::now(); - let routes::Events(events) = - routes::events(State(app.clone()), subscribed_at, subscriber.clone(), None) - .await - .expect("subscribe never fails"); + let routes::Events(events) = routes::events(State(app.clone()), subscriber.clone(), None) + .await + .expect("subscribe never fails"); let events = events + .filter(fixtures::filter::messages()) .take(initial_messages.len()) .collect::>() .immediately() @@ -277,10 +274,8 @@ async fn serial_resume() { ]; // Second subscription - let resubscribed_at = fixtures::now(); let routes::Events(events) = routes::events( State(app.clone()), - resubscribed_at, subscriber.clone(), Some(resume_at.into()), ) @@ -288,6 +283,7 @@ async fn serial_resume() { .expect("subscribe never fails"); let events = events + .filter(fixtures::filter::messages()) .take(resume_messages.len()) .collect::>() .immediately() @@ -314,11 +310,9 @@ async fn serial_resume() { fixtures::message::send(&app, &sender, &channel_b, &fixtures::now()).await, ]; - // Second subscription - let resubscribed_at = fixtures::now(); + // Third subscription let routes::Events(events) = routes::events( State(app.clone()), - resubscribed_at, subscriber.clone(), Some(resume_at.into()), ) @@ -326,6 +320,7 @@ async fn serial_resume() { .expect("subscribe never fails"); let events = events + .filter(fixtures::filter::messages()) .take(final_messages.len()) .collect::>() .immediately() @@ -340,33 +335,3 @@ async fn serial_resume() { } }; } - -#[tokio::test] -async fn removes_expired_messages() { - // Set up the environment - let app = fixtures::scratch_app().await; - let sender = fixtures::login::create(&app).await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; - - fixtures::message::send(&app, &sender, &channel, &fixtures::ancient()).await; - let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await; - - // Call the endpoint - - let subscriber = fixtures::login::create(&app).await; - let subscribed_at = fixtures::now(); - - let routes::Events(mut events) = routes::events(State(app), subscribed_at, subscriber, None) - .await - .expect("subscribe never fails"); - - // Verify the semantics - - let types::ResumableEvent(_, event) = events - .next() - .immediately() - .await - .expect("delivered messages"); - - assert_eq!(message, event); -} diff --git a/src/events/types.rs b/src/events/types.rs index 944321a..9a65207 100644 --- a/src/events/types.rs +++ b/src/events/types.rs @@ -119,6 +119,7 @@ impl ResumeElement for ChannelEvent { pub enum ChannelEventData { Created, Message(MessageEvent), + MessageDeleted(MessageDeletedEvent), } #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] @@ -132,3 +133,14 @@ impl From for ChannelEventData { Self::Message(message) } } + +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] +pub struct MessageDeletedEvent { + pub message: message::Id, +} + +impl From for ChannelEventData { + fn from(message: MessageDeletedEvent) -> Self { + Self::MessageDeleted(message) + } +} diff --git a/src/repo/channel.rs b/src/repo/channel.rs index e85b898..6514426 100644 --- a/src/repo/channel.rs +++ b/src/repo/channel.rs @@ -2,7 +2,7 @@ use std::fmt; use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; -use crate::{clock::DateTime, id::Id as BaseId}; +use crate::{clock::DateTime, events::types::Sequence, id::Id as BaseId}; pub trait Provider { fn channels(&mut self) -> Channels; @@ -31,13 +31,14 @@ impl<'c> Channels<'c> { created_at: &DateTime, ) -> Result { let id = Id::generate(); + let sequence = Sequence::default(); let channel = sqlx::query_as!( Channel, r#" insert - into channel (id, name, created_at) - values ($1, $2, $3) + into channel (id, name, created_at, last_sequence) + values ($1, $2, $3, $4) returning id as "id: Id", name, @@ -46,6 +47,7 @@ impl<'c> Channels<'c> { id, name, created_at, + sequence, ) .fetch_one(&mut *self.0) .await?; diff --git a/src/test/fixtures/filter.rs b/src/test/fixtures/filter.rs new file mode 100644 index 0000000..8847e13 --- /dev/null +++ b/src/test/fixtures/filter.rs @@ -0,0 +1,9 @@ +use futures::future; + +use crate::events::types; + +pub fn messages() -> impl FnMut(&types::ResumableEvent) -> future::Ready { + |types::ResumableEvent(_, event)| { + future::ready(matches!(event.data, types::ChannelEventData::Message(_))) + } +} diff --git a/src/test/fixtures/mod.rs b/src/test/fixtures/mod.rs index 450fbec..d1dd0c3 100644 --- a/src/test/fixtures/mod.rs +++ b/src/test/fixtures/mod.rs @@ -3,6 +3,7 @@ use chrono::{TimeDelta, Utc}; use crate::{app::App, clock::RequestedAt, repo::pool}; pub mod channel; +pub mod filter; pub mod future; pub mod identity; pub mod login; -- cgit v1.2.3 From 155f6f2556b21e6b25afe096b19adcde1255c598 Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Fri, 27 Sep 2024 23:46:55 -0400 Subject: Expire channels, too. --- ...aaf423b1fd14ed9e252d7d9c5323feafb0b9159259.json | 32 +++++++++++++ ...6f1c2744bdb7a93c39ebcf15087c89bba6be71f7cb.json | 20 ++++++++ docs/api.md | 21 ++++++++- src/channel/app.rs | 29 +++++++++++- src/channel/routes/test/on_create.rs | 10 ++-- src/cli.rs | 4 +- src/events/app.rs | 11 +++-- src/events/expire.rs | 18 ------- src/events/mod.rs | 1 - src/events/repo/message.rs | 11 ++--- src/events/types.rs | 55 ++++++++++++++++++---- src/expire.rs | 20 ++++++++ src/lib.rs | 1 + src/login/app.rs | 15 ++++-- src/login/routes/test/login.rs | 6 +++ src/repo/channel.rs | 52 +++++++++++++++++++- src/test/fixtures/filter.rs | 6 +++ 17 files changed, 262 insertions(+), 50 deletions(-) create mode 100644 .sqlx/query-6a782686e163e65f5e03e4aaf423b1fd14ed9e252d7d9c5323feafb0b9159259.json create mode 100644 .sqlx/query-d382215ac9e9d8d2c9b5eb6f1c2744bdb7a93c39ebcf15087c89bba6be71f7cb.json delete mode 100644 src/events/expire.rs create mode 100644 src/expire.rs (limited to 'src/test') diff --git a/.sqlx/query-6a782686e163e65f5e03e4aaf423b1fd14ed9e252d7d9c5323feafb0b9159259.json b/.sqlx/query-6a782686e163e65f5e03e4aaf423b1fd14ed9e252d7d9c5323feafb0b9159259.json new file mode 100644 index 0000000..ae298d6 --- /dev/null +++ b/.sqlx/query-6a782686e163e65f5e03e4aaf423b1fd14ed9e252d7d9c5323feafb0b9159259.json @@ -0,0 +1,32 @@ +{ + "db_name": "SQLite", + "query": "\n select\n channel.id as \"id: Id\",\n channel.name,\n channel.created_at as \"created_at: DateTime\"\n from channel\n left join message\n where created_at < $1\n and message.id is null\n ", + "describe": { + "columns": [ + { + "name": "id: Id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "name", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "created_at: DateTime", + "ordinal": 2, + "type_info": "Text" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false, + false, + false + ] + }, + "hash": "6a782686e163e65f5e03e4aaf423b1fd14ed9e252d7d9c5323feafb0b9159259" +} diff --git a/.sqlx/query-d382215ac9e9d8d2c9b5eb6f1c2744bdb7a93c39ebcf15087c89bba6be71f7cb.json b/.sqlx/query-d382215ac9e9d8d2c9b5eb6f1c2744bdb7a93c39ebcf15087c89bba6be71f7cb.json new file mode 100644 index 0000000..1d448d4 --- /dev/null +++ b/.sqlx/query-d382215ac9e9d8d2c9b5eb6f1c2744bdb7a93c39ebcf15087c89bba6be71f7cb.json @@ -0,0 +1,20 @@ +{ + "db_name": "SQLite", + "query": "\n delete from channel\n where id = $1\n returning 1 as \"row: i64\"\n ", + "describe": { + "columns": [ + { + "name": "row: i64", + "ordinal": 0, + "type_info": "Null" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + null + ] + }, + "hash": "d382215ac9e9d8d2c9b5eb6f1c2744bdb7a93c39ebcf15087c89bba6be71f7cb" +} diff --git a/docs/api.md b/docs/api.md index 9d803ad..e18c6d5 100644 --- a/docs/api.md +++ b/docs/api.md @@ -188,8 +188,27 @@ data: "id": "L1234abcd", data: "name": "example username" data: }, data: "message": { -data: "id": "Mxnjcf3y41prfry9", +data: "id": "M1312acab", data: "body": "beep" data: } data: } + +id: 1235 +data: { +data: "at": "2024-09-28T02:44:27.077355Z", +data: "channel": { +data: "id": "C9876cyyz", +data: "name": "example channel 2" +data: }, +data: "type": "message_deleted", +data: "message": "M1312acab" +data: } + +id: 1236 +data: { +data: "at": "2024-09-28T03:40:25.384318Z", +data: "type": "deleted", +data: "channel": "C9876cyyz" +data: } + ``` diff --git a/src/channel/app.rs b/src/channel/app.rs index 1eeca79..d7312e4 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -1,8 +1,9 @@ +use chrono::TimeDelta; use sqlx::sqlite::SqlitePool; use crate::{ clock::DateTime, - events::{broadcaster::Broadcaster, types::ChannelEvent}, + events::{broadcaster::Broadcaster, repo::message::Provider as _, types::ChannelEvent}, repo::channel::{Channel, Provider as _}, }; @@ -38,6 +39,32 @@ impl<'a> Channels<'a> { Ok(channels) } + + pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { + // Somewhat arbitrarily, expire after 90 days. + let expire_at = relative_to.to_owned() - TimeDelta::days(90); + + let mut tx = self.db.begin().await?; + let expired = tx.channels().expired(&expire_at).await?; + + let mut events = Vec::with_capacity(expired.len()); + for channel in expired { + let sequence = tx.message_events().assign_sequence(&channel).await?; + let event = tx + .channels() + .delete_expired(&channel, sequence, relative_to) + .await?; + events.push(event); + } + + tx.commit().await?; + + for event in events { + self.broadcaster.broadcast(&event); + } + + Ok(()) + } } #[derive(Debug, thiserror::Error)] diff --git a/src/channel/routes/test/on_create.rs b/src/channel/routes/test/on_create.rs index 5e62d7f..e2610a5 100644 --- a/src/channel/routes/test/on_create.rs +++ b/src/channel/routes/test/on_create.rs @@ -1,5 +1,5 @@ use axum::extract::{Json, State}; -use futures::{future, stream::StreamExt as _}; +use futures::stream::StreamExt as _; use crate::{ channel::{app, routes}, @@ -41,7 +41,7 @@ async fn new_channel() { .subscribe(types::ResumePoint::default()) .await .expect("subscribing never fails") - .filter(|types::ResumableEvent(_, event)| future::ready(event.channel == response_channel)); + .filter(fixtures::filter::created()); let types::ResumableEvent(_, event) = events .next() @@ -50,7 +50,11 @@ async fn new_channel() { .expect("creation event published"); assert_eq!(types::Sequence::default(), event.sequence); - assert_eq!(types::ChannelEventData::Created, event.data); + assert!(matches!( + event.data, + types::ChannelEventData::Created(event) + if event.channel == response_channel + )); } #[tokio::test] diff --git a/src/cli.rs b/src/cli.rs index 472d68f..132baf8 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -10,7 +10,7 @@ use clap::Parser; use sqlx::sqlite::SqlitePool; use tokio::net; -use crate::{app::App, channel, clock, events, login, repo::pool}; +use crate::{app::App, channel, clock, events, expire, login, repo::pool}; /// Command-line entry point for running the `hi` server. /// @@ -74,7 +74,7 @@ impl Args { let app = routers() .route_layer(middleware::from_fn_with_state( app.clone(), - events::expire::middleware, + expire::middleware, )) .route_layer(middleware::from_fn(clock::middleware)) .with_state(app); diff --git a/src/events/app.rs b/src/events/app.rs index 03f3ee6..5162c67 100644 --- a/src/events/app.rs +++ b/src/events/app.rs @@ -64,9 +64,10 @@ impl<'a> Events<'a> { let mut events = Vec::with_capacity(expired.len()); for (channel, message) in expired { + let sequence = tx.message_events().assign_sequence(&channel).await?; let event = tx .message_events() - .delete_expired(&channel, &message, relative_to) + .delete_expired(&channel, &message, sequence, relative_to) .await?; events.push(event); } @@ -134,9 +135,11 @@ impl<'a> Events<'a> { Ok(created_events.chain(replay).chain(live_messages).scan( resume_at, |resume_point, event| { - let channel = &event.channel.id; - let sequence = event.sequence; - resume_point.advance(channel, sequence); + let channel = &event.channel_id(); + match event.data { + types::ChannelEventData::Deleted(_) => resume_point.forget(channel), + _ => resume_point.advance(channel, event.sequence), + } let event = types::ResumableEvent(resume_point.clone(), event); diff --git a/src/events/expire.rs b/src/events/expire.rs deleted file mode 100644 index d92142d..0000000 --- a/src/events/expire.rs +++ /dev/null @@ -1,18 +0,0 @@ -use axum::{ - extract::{Request, State}, - middleware::Next, - response::Response, -}; - -use crate::{app::App, clock::RequestedAt, error::Internal}; - -// Expires messages and channels before each request. -pub async fn middleware( - State(app): State, - RequestedAt(expired_at): RequestedAt, - req: Request, - next: Next, -) -> Result { - app.events().expire(&expired_at).await?; - Ok(next.run(req).await) -} diff --git a/src/events/mod.rs b/src/events/mod.rs index 86bc5e9..711ae64 100644 --- a/src/events/mod.rs +++ b/src/events/mod.rs @@ -1,6 +1,5 @@ pub mod app; pub mod broadcaster; -pub mod expire; mod extract; pub mod repo; mod routes; diff --git a/src/events/repo/message.rs b/src/events/repo/message.rs index 32419d5..f8bae2b 100644 --- a/src/events/repo/message.rs +++ b/src/events/repo/message.rs @@ -56,8 +56,8 @@ impl<'c> Events<'c> { .map(|row| types::ChannelEvent { sequence: row.sequence, at: row.sent_at, - channel: channel.clone(), data: types::MessageEvent { + channel: channel.clone(), sender: sender.clone(), message: Message { id: row.id, @@ -72,7 +72,7 @@ impl<'c> Events<'c> { Ok(message) } - async fn assign_sequence(&mut self, channel: &Channel) -> Result { + pub async fn assign_sequence(&mut self, channel: &Channel) -> Result { let next = sqlx::query_scalar!( r#" update channel @@ -92,10 +92,9 @@ impl<'c> Events<'c> { &mut self, channel: &Channel, message: &message::Id, + sequence: Sequence, deleted_at: &DateTime, ) -> Result { - let sequence = self.assign_sequence(channel).await?; - sqlx::query_scalar!( r#" delete from message @@ -110,8 +109,8 @@ impl<'c> Events<'c> { Ok(types::ChannelEvent { sequence, at: *deleted_at, - channel: channel.clone(), data: types::MessageDeletedEvent { + channel: channel.clone(), message: message.clone(), } .into(), @@ -178,8 +177,8 @@ impl<'c> Events<'c> { .map(|row| types::ChannelEvent { sequence: row.sequence, at: row.sent_at, - channel: channel.clone(), data: types::MessageEvent { + channel: channel.clone(), sender: login::Login { id: row.sender_id, name: row.sender_name, diff --git a/src/events/types.rs b/src/events/types.rs index 9a65207..966842d 100644 --- a/src/events/types.rs +++ b/src/events/types.rs @@ -56,6 +56,11 @@ impl ResumePoint { elements.insert(channel.clone(), sequence); } + pub fn forget(&mut self, channel: &channel::Id) { + let Self(elements) = self; + elements.remove(channel); + } + pub fn get(&self, channel: &channel::Id) -> Option { let Self(elements) = self; elements.get(channel).copied() @@ -92,7 +97,6 @@ pub struct ChannelEvent { #[serde(skip)] pub sequence: Sequence, pub at: DateTime, - pub channel: Channel, #[serde(flatten)] pub data: ChannelEventData, } @@ -102,45 +106,78 @@ impl ChannelEvent { Self { at: channel.created_at, sequence: Sequence::default(), - channel, - data: ChannelEventData::Created, + data: CreatedEvent { channel }.into(), + } + } + + pub fn channel_id(&self) -> &channel::Id { + match &self.data { + ChannelEventData::Created(event) => &event.channel.id, + ChannelEventData::Message(event) => &event.channel.id, + ChannelEventData::MessageDeleted(event) => &event.channel.id, + ChannelEventData::Deleted(event) => &event.channel, } } } impl ResumeElement for ChannelEvent { fn element(&self) -> (&channel::Id, Sequence) { - (&self.channel.id, self.sequence) + (self.channel_id(), self.sequence) } } #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] #[serde(tag = "type", rename_all = "snake_case")] pub enum ChannelEventData { - Created, + Created(CreatedEvent), Message(MessageEvent), MessageDeleted(MessageDeletedEvent), + Deleted(DeletedEvent), +} + +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] +pub struct CreatedEvent { + pub channel: Channel, +} + +impl From for ChannelEventData { + fn from(event: CreatedEvent) -> Self { + Self::Created(event) + } } #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] pub struct MessageEvent { + pub channel: Channel, pub sender: Login, pub message: message::Message, } impl From for ChannelEventData { - fn from(message: MessageEvent) -> Self { - Self::Message(message) + fn from(event: MessageEvent) -> Self { + Self::Message(event) } } #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] pub struct MessageDeletedEvent { + pub channel: Channel, pub message: message::Id, } impl From for ChannelEventData { - fn from(message: MessageDeletedEvent) -> Self { - Self::MessageDeleted(message) + fn from(event: MessageDeletedEvent) -> Self { + Self::MessageDeleted(event) + } +} + +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] +pub struct DeletedEvent { + pub channel: channel::Id, +} + +impl From for ChannelEventData { + fn from(event: DeletedEvent) -> Self { + Self::Deleted(event) } } diff --git a/src/expire.rs b/src/expire.rs new file mode 100644 index 0000000..16006d1 --- /dev/null +++ b/src/expire.rs @@ -0,0 +1,20 @@ +use axum::{ + extract::{Request, State}, + middleware::Next, + response::Response, +}; + +use crate::{app::App, clock::RequestedAt, error::Internal}; + +// Expires messages and channels before each request. +pub async fn middleware( + State(app): State, + RequestedAt(expired_at): RequestedAt, + req: Request, + next: Next, +) -> Result { + app.logins().expire(&expired_at).await?; + app.events().expire(&expired_at).await?; + app.channels().expire(&expired_at).await?; + Ok(next.run(req).await) +} diff --git a/src/lib.rs b/src/lib.rs index f731e57..4139d4d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,6 +8,7 @@ pub mod cli; mod clock; mod error; mod events; +mod expire; mod id; mod login; mod password; diff --git a/src/login/app.rs b/src/login/app.rs index 10609c6..292b95f 100644 --- a/src/login/app.rs +++ b/src/login/app.rs @@ -60,11 +60,7 @@ impl<'a> Logins<'a> { } pub async fn validate(&self, secret: &str, used_at: &DateTime) -> Result { - // Somewhat arbitrarily, expire after 7 days. - let expire_at = used_at.to_owned() - TimeDelta::days(7); - let mut tx = self.db.begin().await?; - tx.tokens().expire(&expire_at).await?; let login = tx .tokens() .validate(secret, used_at) @@ -75,6 +71,17 @@ impl<'a> Logins<'a> { Ok(login) } + pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { + // Somewhat arbitrarily, expire after 7 days. + let expire_at = relative_to.to_owned() - TimeDelta::days(7); + + let mut tx = self.db.begin().await?; + tx.tokens().expire(&expire_at).await?; + tx.commit().await?; + + Ok(()) + } + pub async fn logout(&self, secret: &str) -> Result<(), ValidateError> { let mut tx = self.db.begin().await?; tx.tokens() diff --git a/src/login/routes/test/login.rs b/src/login/routes/test/login.rs index d92c01b..719ccca 100644 --- a/src/login/routes/test/login.rs +++ b/src/login/routes/test/login.rs @@ -126,6 +126,12 @@ async fn token_expires() { // Verify the semantics + let expired_at = fixtures::now(); + app.logins() + .expire(&expired_at) + .await + .expect("expiring tokens never fails"); + let verified_at = fixtures::now(); let error = app .logins() diff --git a/src/repo/channel.rs b/src/repo/channel.rs index 6514426..3c7468f 100644 --- a/src/repo/channel.rs +++ b/src/repo/channel.rs @@ -2,7 +2,11 @@ use std::fmt; use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; -use crate::{clock::DateTime, events::types::Sequence, id::Id as BaseId}; +use crate::{ + clock::DateTime, + events::types::{self, Sequence}, + id::Id as BaseId, +}; pub trait Provider { fn channels(&mut self) -> Channels; @@ -91,6 +95,52 @@ impl<'c> Channels<'c> { Ok(channels) } + + pub async fn delete_expired( + &mut self, + channel: &Channel, + sequence: Sequence, + deleted_at: &DateTime, + ) -> Result { + let channel = channel.id.clone(); + sqlx::query_scalar!( + r#" + delete from channel + where id = $1 + returning 1 as "row: i64" + "#, + channel, + ) + .fetch_one(&mut *self.0) + .await?; + + Ok(types::ChannelEvent { + sequence, + at: *deleted_at, + data: types::DeletedEvent { channel }.into(), + }) + } + + pub async fn expired(&mut self, expired_at: &DateTime) -> Result, sqlx::Error> { + let channels = sqlx::query_as!( + Channel, + r#" + select + channel.id as "id: Id", + channel.name, + channel.created_at as "created_at: DateTime" + from channel + left join message + where created_at < $1 + and message.id is null + "#, + expired_at, + ) + .fetch_all(&mut *self.0) + .await?; + + Ok(channels) + } } // Stable identifier for a [Channel]. Prefixed with `C`. diff --git a/src/test/fixtures/filter.rs b/src/test/fixtures/filter.rs index 8847e13..fbebced 100644 --- a/src/test/fixtures/filter.rs +++ b/src/test/fixtures/filter.rs @@ -7,3 +7,9 @@ pub fn messages() -> impl FnMut(&types::ResumableEvent) -> future::Ready { future::ready(matches!(event.data, types::ChannelEventData::Message(_))) } } + +pub fn created() -> impl FnMut(&types::ResumableEvent) -> future::Ready { + |types::ResumableEvent(_, event)| { + future::ready(matches!(event.data, types::ChannelEventData::Created(_))) + } +} -- cgit v1.2.3 From 4d0bb0709b168a24ab6a8dbc86da45d7503596ee Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Sat, 28 Sep 2024 01:40:22 -0400 Subject: Wrap credential and credential-holding types to prevent `Debug` leaks. The following values are considered confidential, and should never be logged, even by accident: * `Password`, which is a durable bearer token for a specific Login; * `IdentitySecret`, which is an ephemeral but potentially long-lived bearer token for a specific Login; or * `IdentityToken`, which may hold cookies containing an `IdentitySecret`. These values are now wrapped in types whose `Debug` impls output opaque values, so that they can be included in structs that `#[derive(Debug)]` without requiring any additional care. The wrappers also avoid implementing `Display`, to prevent inadvertent `to_string()`s. We don't bother obfuscating `IdentitySecret`s in memory or in the `.hi` database. There's no point: we'd also need to store the information needed to de-obfuscate them, and they can be freely invalidated and replaced by blanking that table and asking everyone to log in again. Passwords _are_ obfuscated for storage, as they're intended to be durable. --- ...73a90e3912d23bf952740fe32544bc70a44e6a2744.json | 20 +++++++++ ...b68a8abceb822eb5db2e7dd8e509d4f79c106f8561.json | 20 --------- src/login/app.rs | 22 +++++----- src/login/extract.rs | 46 ++++++++++++++++++--- src/login/routes.rs | 10 +++-- src/login/routes/test/login.rs | 8 ++-- src/login/routes/test/logout.rs | 2 +- src/password.rs | 47 +++++++++++++++++----- src/repo/login/extract.rs | 2 +- src/repo/token.rs | 10 ++--- src/test/fixtures/identity.rs | 15 ++++--- src/test/fixtures/login.rs | 9 +++-- 12 files changed, 143 insertions(+), 68 deletions(-) create mode 100644 .sqlx/query-c28b9bffa73d6a861e122a73a90e3912d23bf952740fe32544bc70a44e6a2744.json delete mode 100644 .sqlx/query-df84b2afcb1493b3643a83b68a8abceb822eb5db2e7dd8e509d4f79c106f8561.json (limited to 'src/test') diff --git a/.sqlx/query-c28b9bffa73d6a861e122a73a90e3912d23bf952740fe32544bc70a44e6a2744.json b/.sqlx/query-c28b9bffa73d6a861e122a73a90e3912d23bf952740fe32544bc70a44e6a2744.json new file mode 100644 index 0000000..5927248 --- /dev/null +++ b/.sqlx/query-c28b9bffa73d6a861e122a73a90e3912d23bf952740fe32544bc70a44e6a2744.json @@ -0,0 +1,20 @@ +{ + "db_name": "SQLite", + "query": "\n insert\n into token (secret, login, issued_at, last_used_at)\n values ($1, $2, $3, $3)\n returning secret as \"secret!: IdentitySecret\"\n ", + "describe": { + "columns": [ + { + "name": "secret!: IdentitySecret", + "ordinal": 0, + "type_info": "Text" + } + ], + "parameters": { + "Right": 3 + }, + "nullable": [ + false + ] + }, + "hash": "c28b9bffa73d6a861e122a73a90e3912d23bf952740fe32544bc70a44e6a2744" +} diff --git a/.sqlx/query-df84b2afcb1493b3643a83b68a8abceb822eb5db2e7dd8e509d4f79c106f8561.json b/.sqlx/query-df84b2afcb1493b3643a83b68a8abceb822eb5db2e7dd8e509d4f79c106f8561.json deleted file mode 100644 index c788557..0000000 --- a/.sqlx/query-df84b2afcb1493b3643a83b68a8abceb822eb5db2e7dd8e509d4f79c106f8561.json +++ /dev/null @@ -1,20 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n insert\n into token (secret, login, issued_at, last_used_at)\n values ($1, $2, $3, $3)\n returning secret as \"secret!\"\n ", - "describe": { - "columns": [ - { - "name": "secret!", - "ordinal": 0, - "type_info": "Text" - } - ], - "parameters": { - "Right": 3 - }, - "nullable": [ - false - ] - }, - "hash": "df84b2afcb1493b3643a83b68a8abceb822eb5db2e7dd8e509d4f79c106f8561" -} diff --git a/src/login/app.rs b/src/login/app.rs index 292b95f..f7fec88 100644 --- a/src/login/app.rs +++ b/src/login/app.rs @@ -1,10 +1,10 @@ use chrono::TimeDelta; use sqlx::sqlite::SqlitePool; -use super::repo::auth::Provider as _; +use super::{extract::IdentitySecret, repo::auth::Provider as _}; use crate::{ clock::DateTime, - password::StoredHash, + password::Password, repo::{ error::NotFound as _, login::{Login, Provider as _}, @@ -24,9 +24,9 @@ impl<'a> Logins<'a> { pub async fn login( &self, name: &str, - password: &str, + password: &Password, login_at: &DateTime, - ) -> Result { + ) -> Result { let mut tx = self.db.begin().await?; let login = if let Some((login, stored_hash)) = tx.auth().for_name(name).await? { @@ -38,7 +38,7 @@ impl<'a> Logins<'a> { return Err(LoginError::Rejected); } } else { - let password_hash = StoredHash::new(password)?; + let password_hash = password.hash()?; tx.logins().create(name, &password_hash).await? }; @@ -49,8 +49,8 @@ impl<'a> Logins<'a> { } #[cfg(test)] - pub async fn create(&self, name: &str, password: &str) -> Result { - let password_hash = StoredHash::new(password)?; + pub async fn create(&self, name: &str, password: &Password) -> Result { + let password_hash = password.hash()?; let mut tx = self.db.begin().await?; let login = tx.logins().create(name, &password_hash).await?; @@ -59,7 +59,11 @@ impl<'a> Logins<'a> { Ok(login) } - pub async fn validate(&self, secret: &str, used_at: &DateTime) -> Result { + pub async fn validate( + &self, + secret: &IdentitySecret, + used_at: &DateTime, + ) -> Result { let mut tx = self.db.begin().await?; let login = tx .tokens() @@ -82,7 +86,7 @@ impl<'a> Logins<'a> { Ok(()) } - pub async fn logout(&self, secret: &str) -> Result<(), ValidateError> { + pub async fn logout(&self, secret: &IdentitySecret) -> Result<(), ValidateError> { let mut tx = self.db.begin().await?; tx.tokens() .revoke(secret) diff --git a/src/login/extract.rs b/src/login/extract.rs index 5ef454c..3b31d4c 100644 --- a/src/login/extract.rs +++ b/src/login/extract.rs @@ -1,3 +1,5 @@ +use std::fmt; + use axum::{ extract::FromRequestParts, http::request::Parts, @@ -7,11 +9,22 @@ use axum_extra::extract::cookie::{Cookie, CookieJar}; // The usage pattern here - receive the extractor as an argument, return it in // the response - is heavily modelled after CookieJar's own intended usage. -#[derive(Clone, Debug)] +#[derive(Clone)] pub struct IdentityToken { cookies: CookieJar, } +impl fmt::Debug for IdentityToken { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("IdentityToken") + .field( + "identity", + &self.cookies.get(IDENTITY_COOKIE).map(|_| "********"), + ) + .finish() + } +} + impl IdentityToken { // Creates a new, unpopulated identity token store. #[cfg(test)] @@ -26,14 +39,18 @@ impl IdentityToken { // return [None]. If the identity has previously been [set], then this // will return that secret, regardless of what the request originally // included. - pub fn secret(&self) -> Option<&str> { - self.cookies.get(IDENTITY_COOKIE).map(Cookie::value) + pub fn secret(&self) -> Option { + self.cookies + .get(IDENTITY_COOKIE) + .map(Cookie::value) + .map(IdentitySecret::from) } // Positively set the identity secret, and ensure that it will be sent // back to the client when this extractor is included in a response. - pub fn set(self, secret: &str) -> Self { - let identity_cookie = Cookie::build((IDENTITY_COOKIE, String::from(secret))) + pub fn set(self, secret: impl Into) -> Self { + let IdentitySecret(secret) = secret.into(); + let identity_cookie = Cookie::build((IDENTITY_COOKIE, secret)) .http_only(true) .path("/api/") .permanent() @@ -76,3 +93,22 @@ impl IntoResponseParts for IdentityToken { cookies.into_response_parts(res) } } + +#[derive(sqlx::Type)] +#[sqlx(transparent)] +pub struct IdentitySecret(String); + +impl fmt::Debug for IdentitySecret { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_tuple("IdentityToken").field(&"********").finish() + } +} + +impl From for IdentitySecret +where + S: Into, +{ + fn from(value: S) -> Self { + Self(value.into()) + } +} diff --git a/src/login/routes.rs b/src/login/routes.rs index 31a68d0..4664063 100644 --- a/src/login/routes.rs +++ b/src/login/routes.rs @@ -6,7 +6,9 @@ use axum::{ Router, }; -use crate::{app::App, clock::RequestedAt, error::Internal, repo::login::Login}; +use crate::{ + app::App, clock::RequestedAt, error::Internal, password::Password, repo::login::Login, +}; use super::{app, extract::IdentityToken}; @@ -38,7 +40,7 @@ impl IntoResponse for Boot { #[derive(serde::Deserialize)] struct LoginRequest { name: String, - password: String, + password: Password, } async fn on_login( @@ -52,7 +54,7 @@ async fn on_login( .login(&request.name, &request.password, &now) .await .map_err(LoginError)?; - let identity = identity.set(&token); + let identity = identity.set(token); Ok((identity, StatusCode::NO_CONTENT)) } @@ -82,7 +84,7 @@ async fn on_logout( Json(LogoutRequest {}): Json, ) -> Result<(IdentityToken, StatusCode), LogoutError> { if let Some(secret) = identity.secret() { - app.logins().logout(secret).await.map_err(LogoutError)?; + app.logins().logout(&secret).await.map_err(LogoutError)?; } let identity = identity.clear(); diff --git a/src/login/routes/test/login.rs b/src/login/routes/test/login.rs index 719ccca..10c17d6 100644 --- a/src/login/routes/test/login.rs +++ b/src/login/routes/test/login.rs @@ -38,7 +38,7 @@ async fn new_identity() { let validated_at = fixtures::now(); let validated = app .logins() - .validate(secret, &validated_at) + .validate(&secret, &validated_at) .await .expect("identity secret is valid"); @@ -75,7 +75,7 @@ async fn existing_identity() { let validated_at = fixtures::now(); let validated_login = app .logins() - .validate(secret, &validated_at) + .validate(&secret, &validated_at) .await .expect("identity secret is valid"); @@ -122,7 +122,7 @@ async fn token_expires() { let (identity, _) = routes::on_login(State(app.clone()), logged_in_at, identity, Json(request)) .await .expect("logged in with valid credentials"); - let token = identity.secret().expect("logged in with valid credentials"); + let secret = identity.secret().expect("logged in with valid credentials"); // Verify the semantics @@ -135,7 +135,7 @@ async fn token_expires() { let verified_at = fixtures::now(); let error = app .logins() - .validate(token, &verified_at) + .validate(&secret, &verified_at) .await .expect_err("validating an expired token"); diff --git a/src/login/routes/test/logout.rs b/src/login/routes/test/logout.rs index 4c09a73..05594be 100644 --- a/src/login/routes/test/logout.rs +++ b/src/login/routes/test/logout.rs @@ -37,7 +37,7 @@ async fn successful() { let error = app .logins() - .validate(secret, &now) + .validate(&secret, &now) .await .expect_err("secret is invalid"); match error { diff --git a/src/password.rs b/src/password.rs index b14f728..da3930f 100644 --- a/src/password.rs +++ b/src/password.rs @@ -1,3 +1,5 @@ +use std::fmt; + use argon2::Argon2; use password_hash::{PasswordHash, PasswordHasher, PasswordVerifier, SaltString}; use rand_core::OsRng; @@ -7,16 +9,7 @@ use rand_core::OsRng; pub struct StoredHash(String); impl StoredHash { - pub fn new(password: &str) -> Result { - let salt = SaltString::generate(&mut OsRng); - let argon2 = Argon2::default(); - let hash = argon2 - .hash_password(password.as_bytes(), &salt)? - .to_string(); - Ok(Self(hash)) - } - - pub fn verify(&self, password: &str) -> Result { + pub fn verify(&self, password: &Password) -> Result { let hash = PasswordHash::new(&self.0)?; match Argon2::default().verify_password(password.as_bytes(), &hash) { @@ -29,3 +22,37 @@ impl StoredHash { } } } + +#[derive(serde::Deserialize)] +#[serde(transparent)] +pub struct Password(String); + +impl Password { + pub fn hash(&self) -> Result { + let Self(password) = self; + let salt = SaltString::generate(&mut OsRng); + let argon2 = Argon2::default(); + let hash = argon2 + .hash_password(password.as_bytes(), &salt)? + .to_string(); + Ok(StoredHash(hash)) + } + + fn as_bytes(&self) -> &[u8] { + let Self(value) = self; + value.as_bytes() + } +} + +impl fmt::Debug for Password { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_tuple("Password").field(&"********").finish() + } +} + +#[cfg(test)] +impl From for Password { + fn from(password: String) -> Self { + Self(password) + } +} diff --git a/src/repo/login/extract.rs b/src/repo/login/extract.rs index e5f96d0..c127078 100644 --- a/src/repo/login/extract.rs +++ b/src/repo/login/extract.rs @@ -32,7 +32,7 @@ impl FromRequestParts for Login { let secret = identity_token.secret().ok_or(LoginError::Unauthorized)?; let app = State::::from_request_parts(parts, state).await?; - match app.logins().validate(secret, &used_at).await { + match app.logins().validate(&secret, &used_at).await { Ok(login) => Ok(login), Err(ValidateError::InvalidToken) => Err(LoginError::Unauthorized), Err(other) => Err(other.into()), diff --git a/src/repo/token.rs b/src/repo/token.rs index 8276bea..15eef48 100644 --- a/src/repo/token.rs +++ b/src/repo/token.rs @@ -2,7 +2,7 @@ use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; use uuid::Uuid; use super::login::{self, Login}; -use crate::clock::DateTime; +use crate::{clock::DateTime, login::extract::IdentitySecret}; pub trait Provider { fn tokens(&mut self) -> Tokens; @@ -23,7 +23,7 @@ impl<'c> Tokens<'c> { &mut self, login: &Login, issued_at: &DateTime, - ) -> Result { + ) -> Result { let secret = Uuid::new_v4().to_string(); let secret = sqlx::query_scalar!( @@ -31,7 +31,7 @@ impl<'c> Tokens<'c> { insert into token (secret, login, issued_at, last_used_at) values ($1, $2, $3, $3) - returning secret as "secret!" + returning secret as "secret!: IdentitySecret" "#, secret, login.id, @@ -44,7 +44,7 @@ impl<'c> Tokens<'c> { } // Revoke a token by its secret. - pub async fn revoke(&mut self, secret: &str) -> Result<(), sqlx::Error> { + pub async fn revoke(&mut self, secret: &IdentitySecret) -> Result<(), sqlx::Error> { sqlx::query!( r#" delete @@ -82,7 +82,7 @@ impl<'c> Tokens<'c> { // timestamp will be set to `used_at`. pub async fn validate( &mut self, - secret: &str, + secret: &IdentitySecret, used_at: &DateTime, ) -> Result { // I would use `update … returning` to do this in one query, but diff --git a/src/test/fixtures/identity.rs b/src/test/fixtures/identity.rs index 16463aa..69b5f4c 100644 --- a/src/test/fixtures/identity.rs +++ b/src/test/fixtures/identity.rs @@ -1,12 +1,17 @@ use uuid::Uuid; -use crate::{app::App, clock::RequestedAt, login::extract::IdentityToken}; +use crate::{ + app::App, + clock::RequestedAt, + login::extract::{IdentitySecret, IdentityToken}, + password::Password, +}; pub fn not_logged_in() -> IdentityToken { IdentityToken::new() } -pub async fn logged_in(app: &App, login: &(String, String), now: &RequestedAt) -> IdentityToken { +pub async fn logged_in(app: &App, login: &(String, Password), now: &RequestedAt) -> IdentityToken { let (name, password) = login; let token = app .logins() @@ -14,14 +19,14 @@ pub async fn logged_in(app: &App, login: &(String, String), now: &RequestedAt) - .await .expect("should succeed given known-valid credentials"); - IdentityToken::new().set(&token) + IdentityToken::new().set(token) } -pub fn secret(identity: &IdentityToken) -> &str { +pub fn secret(identity: &IdentityToken) -> IdentitySecret { identity.secret().expect("identity contained a secret") } pub fn fictitious() -> IdentityToken { let token = Uuid::new_v4().to_string(); - IdentityToken::new().set(&token) + IdentityToken::new().set(token) } diff --git a/src/test/fixtures/login.rs b/src/test/fixtures/login.rs index f1e4b15..d6a321b 100644 --- a/src/test/fixtures/login.rs +++ b/src/test/fixtures/login.rs @@ -3,10 +3,11 @@ use uuid::Uuid; use crate::{ app::App, + password::Password, repo::login::{self, Login}, }; -pub async fn create_with_password(app: &App) -> (String, String) { +pub async fn create_with_password(app: &App) -> (String, Password) { let (name, password) = propose(); app.logins() .create(&name, &password) @@ -31,7 +32,7 @@ pub fn fictitious() -> Login { } } -pub fn propose() -> (String, String) { +pub fn propose() -> (String, Password) { (name(), propose_password()) } @@ -39,6 +40,6 @@ fn name() -> String { rand::random::().to_string() } -pub fn propose_password() -> String { - Uuid::new_v4().to_string() +pub fn propose_password() -> Password { + Uuid::new_v4().to_string().into() } -- cgit v1.2.3