From eff129bc1f29bcb1b2b9d10c6b49ab886edc83d6 Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Fri, 27 Sep 2024 18:17:02 -0400 Subject: Make `/api/events` a firehose endpoint. It now includes events for all channels. Clients are responsible for filtering. The schema for channel events has changed; it now includes a channel name and ID, in the same format as the sender's name and ID. They also now include a `"type"` field, whose only valid value (as of this writing) is `"message"`. This is groundwork for delivering message deletion (expiry) events to clients, and notifying clients of channel lifecycle events. --- src/channel/app.rs | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) (limited to 'src/channel/app.rs') diff --git a/src/channel/app.rs b/src/channel/app.rs index 793fa35..6bad158 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -1,18 +1,14 @@ use sqlx::sqlite::SqlitePool; -use crate::{ - events::broadcaster::Broadcaster, - repo::channel::{Channel, Provider as _}, -}; +use crate::repo::channel::{Channel, Provider as _}; pub struct Channels<'a> { db: &'a SqlitePool, - broadcaster: &'a Broadcaster, } impl<'a> Channels<'a> { - pub const fn new(db: &'a SqlitePool, broadcaster: &'a Broadcaster) -> Self { - Self { db, broadcaster } + pub const fn new(db: &'a SqlitePool) -> Self { + Self { db } } pub async fn create(&self, name: &str) -> Result { @@ -22,7 +18,6 @@ impl<'a> Channels<'a> { .create(name) .await .map_err(|err| CreateError::from_duplicate_name(err, name))?; - self.broadcaster.register_channel(&channel.id); tx.commit().await?; Ok(channel) -- cgit v1.2.3 From 1458ff7be5d883444943090cb636e9343487d03e Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Fri, 27 Sep 2024 20:18:50 -0400 Subject: Send created events when channels are added. --- ...8c64a38a3f73b112e74b7318ee8e52e475866d8cfd.json | 32 ++++++++++++ ...f4f89221feb030ac8f42cf594c875ecd3bfeca3eb7.json | 26 ---------- ...926096285d50afb88a326cff0ecab96058a2f6d93a.json | 32 ++++++++++++ ...2b3d9939d4edb10e0e654e2a1b19949c3427522a08.json | 26 ---------- ...b3ef57ab9a8bc90dd1104e9900062dccc708e9bd33.json | 32 ++++++++++++ ...d76f0c34c759006d269ccccd6299c66b672076449d.json | 26 ---------- docs/api.md | 10 ++++ migrations/20240928002608_channel_lifecycle.sql | 57 ++++++++++++++++++++++ src/app.rs | 2 +- src/channel/app.rs | 18 +++++-- src/channel/routes.rs | 3 +- src/channel/routes/test/list.rs | 6 +-- src/channel/routes/test/on_create.rs | 47 +++++++++++++----- src/channel/routes/test/on_send.rs | 2 +- src/events/app.rs | 24 ++++++--- src/events/routes/test.rs | 18 +++---- src/events/types.rs | 40 ++++++++++++++- src/repo/channel.rs | 30 +++++++++--- src/test/fixtures/channel.rs | 6 +-- 19 files changed, 309 insertions(+), 128 deletions(-) create mode 100644 .sqlx/query-22f313d9afcdd02df74a8b8c64a38a3f73b112e74b7318ee8e52e475866d8cfd.json delete mode 100644 .sqlx/query-79e6e76bc3f974248bf8a7f4f89221feb030ac8f42cf594c875ecd3bfeca3eb7.json create mode 100644 .sqlx/query-7ccae3dde1aba5f22cf9e3926096285d50afb88a326cff0ecab96058a2f6d93a.json delete mode 100644 .sqlx/query-8c78f7bbfb5522afa15c412b3d9939d4edb10e0e654e2a1b19949c3427522a08.json create mode 100644 .sqlx/query-bddcc014038cc8611bffcbb3ef57ab9a8bc90dd1104e9900062dccc708e9bd33.json delete mode 100644 .sqlx/query-dbe468d2a7f64a45e70dfbd76f0c34c759006d269ccccd6299c66b672076449d.json create mode 100644 migrations/20240928002608_channel_lifecycle.sql (limited to 'src/channel/app.rs') diff --git a/.sqlx/query-22f313d9afcdd02df74a8b8c64a38a3f73b112e74b7318ee8e52e475866d8cfd.json b/.sqlx/query-22f313d9afcdd02df74a8b8c64a38a3f73b112e74b7318ee8e52e475866d8cfd.json new file mode 100644 index 0000000..3d5d06c --- /dev/null +++ b/.sqlx/query-22f313d9afcdd02df74a8b8c64a38a3f73b112e74b7318ee8e52e475866d8cfd.json @@ -0,0 +1,32 @@ +{ + "db_name": "SQLite", + "query": "\n select\n id as \"id: Id\",\n name,\n created_at as \"created_at: DateTime\"\n from channel\n where id = $1\n ", + "describe": { + "columns": [ + { + "name": "id: Id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "name", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "created_at: DateTime", + "ordinal": 2, + "type_info": "Text" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false, + false, + false + ] + }, + "hash": "22f313d9afcdd02df74a8b8c64a38a3f73b112e74b7318ee8e52e475866d8cfd" +} diff --git a/.sqlx/query-79e6e76bc3f974248bf8a7f4f89221feb030ac8f42cf594c875ecd3bfeca3eb7.json b/.sqlx/query-79e6e76bc3f974248bf8a7f4f89221feb030ac8f42cf594c875ecd3bfeca3eb7.json deleted file mode 100644 index b46d940..0000000 --- a/.sqlx/query-79e6e76bc3f974248bf8a7f4f89221feb030ac8f42cf594c875ecd3bfeca3eb7.json +++ /dev/null @@ -1,26 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n select id as \"id: Id\", name\n from channel\n where id = $1\n ", - "describe": { - "columns": [ - { - "name": "id: Id", - "ordinal": 0, - "type_info": "Text" - }, - { - "name": "name", - "ordinal": 1, - "type_info": "Text" - } - ], - "parameters": { - "Right": 1 - }, - "nullable": [ - false, - false - ] - }, - "hash": "79e6e76bc3f974248bf8a7f4f89221feb030ac8f42cf594c875ecd3bfeca3eb7" -} diff --git a/.sqlx/query-7ccae3dde1aba5f22cf9e3926096285d50afb88a326cff0ecab96058a2f6d93a.json b/.sqlx/query-7ccae3dde1aba5f22cf9e3926096285d50afb88a326cff0ecab96058a2f6d93a.json new file mode 100644 index 0000000..4ec7118 --- /dev/null +++ b/.sqlx/query-7ccae3dde1aba5f22cf9e3926096285d50afb88a326cff0ecab96058a2f6d93a.json @@ -0,0 +1,32 @@ +{ + "db_name": "SQLite", + "query": "\n select\n id as \"id: Id\",\n name,\n created_at as \"created_at: DateTime\"\n from channel\n order by channel.name\n ", + "describe": { + "columns": [ + { + "name": "id: Id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "name", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "created_at: DateTime", + "ordinal": 2, + "type_info": "Text" + } + ], + "parameters": { + "Right": 0 + }, + "nullable": [ + false, + false, + false + ] + }, + "hash": "7ccae3dde1aba5f22cf9e3926096285d50afb88a326cff0ecab96058a2f6d93a" +} diff --git a/.sqlx/query-8c78f7bbfb5522afa15c412b3d9939d4edb10e0e654e2a1b19949c3427522a08.json b/.sqlx/query-8c78f7bbfb5522afa15c412b3d9939d4edb10e0e654e2a1b19949c3427522a08.json deleted file mode 100644 index 4d9051d..0000000 --- a/.sqlx/query-8c78f7bbfb5522afa15c412b3d9939d4edb10e0e654e2a1b19949c3427522a08.json +++ /dev/null @@ -1,26 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n select\n channel.id as \"id: Id\",\n channel.name\n from channel\n order by channel.name\n ", - "describe": { - "columns": [ - { - "name": "id: Id", - "ordinal": 0, - "type_info": "Text" - }, - { - "name": "name", - "ordinal": 1, - "type_info": "Text" - } - ], - "parameters": { - "Right": 0 - }, - "nullable": [ - false, - false - ] - }, - "hash": "8c78f7bbfb5522afa15c412b3d9939d4edb10e0e654e2a1b19949c3427522a08" -} diff --git a/.sqlx/query-bddcc014038cc8611bffcbb3ef57ab9a8bc90dd1104e9900062dccc708e9bd33.json b/.sqlx/query-bddcc014038cc8611bffcbb3ef57ab9a8bc90dd1104e9900062dccc708e9bd33.json new file mode 100644 index 0000000..64d56dd --- /dev/null +++ b/.sqlx/query-bddcc014038cc8611bffcbb3ef57ab9a8bc90dd1104e9900062dccc708e9bd33.json @@ -0,0 +1,32 @@ +{ + "db_name": "SQLite", + "query": "\n insert\n into channel (id, name, created_at)\n values ($1, $2, $3)\n returning\n id as \"id: Id\",\n name,\n created_at as \"created_at: DateTime\"\n ", + "describe": { + "columns": [ + { + "name": "id: Id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "name", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "created_at: DateTime", + "ordinal": 2, + "type_info": "Text" + } + ], + "parameters": { + "Right": 3 + }, + "nullable": [ + false, + false, + false + ] + }, + "hash": "bddcc014038cc8611bffcbb3ef57ab9a8bc90dd1104e9900062dccc708e9bd33" +} diff --git a/.sqlx/query-dbe468d2a7f64a45e70dfbd76f0c34c759006d269ccccd6299c66b672076449d.json b/.sqlx/query-dbe468d2a7f64a45e70dfbd76f0c34c759006d269ccccd6299c66b672076449d.json deleted file mode 100644 index 3db94ca..0000000 --- a/.sqlx/query-dbe468d2a7f64a45e70dfbd76f0c34c759006d269ccccd6299c66b672076449d.json +++ /dev/null @@ -1,26 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n insert\n into channel (id, name)\n values ($1, $2)\n returning id as \"id: Id\", name\n ", - "describe": { - "columns": [ - { - "name": "id: Id", - "ordinal": 0, - "type_info": "Text" - }, - { - "name": "name", - "ordinal": 1, - "type_info": "Text" - } - ], - "parameters": { - "Right": 2 - }, - "nullable": [ - false, - false - ] - }, - "hash": "dbe468d2a7f64a45e70dfbd76f0c34c759006d269ccccd6299c66b672076449d" -} diff --git a/docs/api.md b/docs/api.md index 8b31941..c5ee34a 100644 --- a/docs/api.md +++ b/docs/api.md @@ -165,6 +165,16 @@ The event IDs `hi` sends in `application/event-stream` encoding are ephemeral, a The returned event stream is a sequence of events: ```json +id: 1233 +data: { +data: "type": "created" +data: "at": "2024-09-27T23:18:10.208147Z", +data: "channel": { +data: "id": "C9876cyyz", +data: "name": "example channel 2" +data: }, +data: } + id: 1234 data: { data: "type": "message", diff --git a/migrations/20240928002608_channel_lifecycle.sql b/migrations/20240928002608_channel_lifecycle.sql new file mode 100644 index 0000000..bc690d7 --- /dev/null +++ b/migrations/20240928002608_channel_lifecycle.sql @@ -0,0 +1,57 @@ +alter table channel +rename to old_channel; + +-- Add new columns +create table channel ( + id text + not null + primary key, + name text + not null + unique, + created_at text + not null +); + +-- Transfer data from original table +insert into channel +select + channel.id, + channel.name, + coalesce( + min(message.sent_at), + strftime('%FT%R:%f+00:00', 'now', 'utc') + ) as created_at +from old_channel as channel + left join message on channel.id = message.channel +group by channel.id, channel.name; + +-- Fix up `message` foreign keys +alter table message +rename to old_message; + +create table message ( + id text + not null + primary key, + sequence bigint + not null, + channel text + not null + references channel (id), + sender text + not null + references login (id), + body text + not null, + sent_at text + not null, + unique (channel, sequence) +); + +insert into message +select * from old_message; + +-- Bury the bodies respectfully +drop table old_message; +drop table old_channel; diff --git a/src/app.rs b/src/app.rs index 07b932a..245feb1 100644 --- a/src/app.rs +++ b/src/app.rs @@ -29,6 +29,6 @@ impl App { } pub const fn channels(&self) -> Channels { - Channels::new(&self.db) + Channels::new(&self.db, &self.broadcaster) } } diff --git a/src/channel/app.rs b/src/channel/app.rs index 6bad158..1eeca79 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -1,25 +1,33 @@ use sqlx::sqlite::SqlitePool; -use crate::repo::channel::{Channel, Provider as _}; +use crate::{ + clock::DateTime, + events::{broadcaster::Broadcaster, types::ChannelEvent}, + repo::channel::{Channel, Provider as _}, +}; pub struct Channels<'a> { db: &'a SqlitePool, + broadcaster: &'a Broadcaster, } impl<'a> Channels<'a> { - pub const fn new(db: &'a SqlitePool) -> Self { - Self { db } + pub const fn new(db: &'a SqlitePool, broadcaster: &'a Broadcaster) -> Self { + Self { db, broadcaster } } - pub async fn create(&self, name: &str) -> Result { + pub async fn create(&self, name: &str, created_at: &DateTime) -> Result { let mut tx = self.db.begin().await?; let channel = tx .channels() - .create(name) + .create(name, created_at) .await .map_err(|err| CreateError::from_duplicate_name(err, name))?; tx.commit().await?; + self.broadcaster + .broadcast(&ChannelEvent::created(channel.clone())); + Ok(channel) } diff --git a/src/channel/routes.rs b/src/channel/routes.rs index f524e62..1f8db5a 100644 --- a/src/channel/routes.rs +++ b/src/channel/routes.rs @@ -52,11 +52,12 @@ struct CreateRequest { async fn on_create( State(app): State, _: Login, // requires auth, but doesn't actually care who you are + RequestedAt(created_at): RequestedAt, Json(form): Json, ) -> Result, CreateError> { let channel = app .channels() - .create(&form.name) + .create(&form.name, &created_at) .await .map_err(CreateError)?; diff --git a/src/channel/routes/test/list.rs b/src/channel/routes/test/list.rs index f7f7b44..bc94024 100644 --- a/src/channel/routes/test/list.rs +++ b/src/channel/routes/test/list.rs @@ -26,7 +26,7 @@ async fn one_channel() { let app = fixtures::scratch_app().await; let viewer = fixtures::login::create(&app).await; - let channel = fixtures::channel::create(&app).await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; // Call the endpoint @@ -46,8 +46,8 @@ async fn multiple_channels() { let app = fixtures::scratch_app().await; let viewer = fixtures::login::create(&app).await; let channels = vec![ - fixtures::channel::create(&app).await, - fixtures::channel::create(&app).await, + fixtures::channel::create(&app, &fixtures::now()).await, + fixtures::channel::create(&app, &fixtures::now()).await, ]; // Call the endpoint diff --git a/src/channel/routes/test/on_create.rs b/src/channel/routes/test/on_create.rs index 23885c0..bb6697f 100644 --- a/src/channel/routes/test/on_create.rs +++ b/src/channel/routes/test/on_create.rs @@ -1,8 +1,10 @@ use axum::extract::{Json, State}; +use futures::{future, stream::StreamExt as _}; use crate::{ channel::{app, routes}, - test::fixtures, + events::types, + test::fixtures::{self, future::Immediately as _}, }; #[tokio::test] @@ -16,10 +18,14 @@ async fn new_channel() { let name = fixtures::channel::propose(); let request = routes::CreateRequest { name }; - let Json(response_channel) = - routes::on_create(State(app.clone()), creator, Json(request.clone())) - .await - .expect("new channel in an empty app"); + let Json(response_channel) = routes::on_create( + State(app.clone()), + creator, + fixtures::now(), + Json(request.clone()), + ) + .await + .expect("new channel in an empty app"); // Verify the structure of the response @@ -28,8 +34,23 @@ async fn new_channel() { // Verify the semantics let channels = app.channels().all().await.expect("always succeeds"); - assert!(channels.contains(&response_channel)); + + let mut events = app + .events() + .subscribe(&fixtures::now(), types::ResumePoint::default()) + .await + .expect("subscribing never fails") + .filter(|types::ResumableEvent(_, event)| future::ready(event.channel == response_channel)); + + let types::ResumableEvent(_, event) = events + .next() + .immediately() + .await + .expect("creation event published"); + + assert_eq!(types::Sequence::default(), event.sequence); + assert_eq!(types::ChannelEventData::Created, event.data); } #[tokio::test] @@ -38,15 +59,19 @@ async fn duplicate_name() { let app = fixtures::scratch_app().await; let creator = fixtures::login::create(&app).await; - let channel = fixtures::channel::create(&app).await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; // Call the endpoint let request = routes::CreateRequest { name: channel.name }; - let routes::CreateError(error) = - routes::on_create(State(app.clone()), creator, Json(request.clone())) - .await - .expect_err("duplicate channel name"); + let routes::CreateError(error) = routes::on_create( + State(app.clone()), + creator, + fixtures::now(), + Json(request.clone()), + ) + .await + .expect_err("duplicate channel name"); // Verify the structure of the response diff --git a/src/channel/routes/test/on_send.rs b/src/channel/routes/test/on_send.rs index 5d87bdc..e4de0f1 100644 --- a/src/channel/routes/test/on_send.rs +++ b/src/channel/routes/test/on_send.rs @@ -14,7 +14,7 @@ async fn messages_in_order() { let app = fixtures::scratch_app().await; let sender = fixtures::login::create(&app).await; - let channel = fixtures::channel::create(&app).await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; // Call the endpoint (twice) diff --git a/src/events/app.rs b/src/events/app.rs index 043a29b..134e86a 100644 --- a/src/events/app.rs +++ b/src/events/app.rs @@ -11,7 +11,7 @@ use sqlx::sqlite::SqlitePool; use super::{ broadcaster::Broadcaster, repo::message::Provider as _, - types::{self, ResumePoint}, + types::{self, ChannelEvent, ResumePoint}, }; use crate::{ clock::DateTime, @@ -66,6 +66,17 @@ impl<'a> Events<'a> { let mut tx = self.db.begin().await?; let channels = tx.channels().all().await?; + let created_events = { + let resume_at = resume_at.clone(); + let channels = channels.clone(); + stream::iter( + channels + .into_iter() + .map(ChannelEvent::created) + .filter(move |event| resume_at.not_after(event)), + ) + }; + // Subscribe before retrieving, to catch messages broadcast while we're // querying the DB. We'll prune out duplicates later. let live_messages = self.broadcaster.subscribe(); @@ -104,9 +115,9 @@ impl<'a> Events<'a> { // stored_messages. .filter(Self::resume(resume_live_at)); - Ok(replay - .chain(live_messages) - .scan(resume_at, |resume_point, event| { + Ok(created_events.chain(replay).chain(live_messages).scan( + resume_at, + |resume_point, event| { let channel = &event.channel.id; let sequence = event.sequence; resume_point.advance(channel, sequence); @@ -114,13 +125,14 @@ impl<'a> Events<'a> { let event = types::ResumableEvent(resume_point.clone(), event); future::ready(Some(event)) - })) + }, + )) } fn resume( resume_at: ResumePoint, ) -> impl for<'m> FnMut(&'m types::ChannelEvent) -> future::Ready { - move |event| future::ready(resume_at < event.sequence()) + move |event| future::ready(resume_at.not_after(event)) } fn skip_expired( expire_at: &DateTime, diff --git a/src/events/routes/test.rs b/src/events/routes/test.rs index f289225..55ada95 100644 --- a/src/events/routes/test.rs +++ b/src/events/routes/test.rs @@ -15,7 +15,7 @@ async fn includes_historical_message() { let app = fixtures::scratch_app().await; let sender = fixtures::login::create(&app).await; - let channel = fixtures::channel::create(&app).await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await; // Call the endpoint @@ -42,7 +42,7 @@ async fn includes_live_message() { // Set up the environment let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app).await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; // Call the endpoint @@ -75,8 +75,8 @@ async fn includes_multiple_channels() { let sender = fixtures::login::create(&app).await; let channels = [ - fixtures::channel::create(&app).await, - fixtures::channel::create(&app).await, + fixtures::channel::create(&app, &fixtures::now()).await, + fixtures::channel::create(&app, &fixtures::now()).await, ]; let messages = stream::iter(channels) @@ -117,7 +117,7 @@ async fn sequential_messages() { // Set up the environment let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app).await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; let sender = fixtures::login::create(&app).await; let messages = vec![ @@ -156,7 +156,7 @@ async fn resumes_from() { // Set up the environment let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app).await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; let sender = fixtures::login::create(&app).await; let initial_message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await; @@ -229,8 +229,8 @@ async fn serial_resume() { let app = fixtures::scratch_app().await; let sender = fixtures::login::create(&app).await; - let channel_a = fixtures::channel::create(&app).await; - let channel_b = fixtures::channel::create(&app).await; + let channel_a = fixtures::channel::create(&app, &fixtures::now()).await; + let channel_b = fixtures::channel::create(&app, &fixtures::now()).await; // Call the endpoint @@ -346,7 +346,7 @@ async fn removes_expired_messages() { // Set up the environment let app = fixtures::scratch_app().await; let sender = fixtures::login::create(&app).await; - let channel = fixtures::channel::create(&app).await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; fixtures::message::send(&app, &sender, &channel, &fixtures::ancient()).await; let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await; diff --git a/src/events/types.rs b/src/events/types.rs index 6747afc..7c0e0a4 100644 --- a/src/events/types.rs +++ b/src/events/types.rs @@ -11,6 +11,7 @@ use crate::{ #[derive( Debug, + Default, Eq, Ord, PartialEq, @@ -59,7 +60,30 @@ impl ResumePoint { let Self(elements) = self; elements.get(channel).copied() } + + pub fn not_after(&self, event: impl ResumeElement) -> bool { + let Self(elements) = self; + let (channel, sequence) = event.element(); + + elements + .get(channel) + .map_or(true, |resume_at| resume_at < &sequence) + } } + +pub trait ResumeElement { + fn element(&self) -> (&channel::Id, Sequence); +} + +impl ResumeElement for &T +where + T: ResumeElement, +{ + fn element(&self) -> (&channel::Id, Sequence) { + (*self).element() + } +} + #[derive(Clone, Debug)] pub struct ResumableEvent(pub ResumePoint, pub ChannelEvent); @@ -74,14 +98,26 @@ pub struct ChannelEvent { } impl ChannelEvent { - pub fn sequence(&self) -> ResumePoint { - ResumePoint::singleton(&self.channel.id, self.sequence) + pub fn created(channel: Channel) -> Self { + Self { + at: channel.created_at, + sequence: Sequence::default(), + channel, + data: ChannelEventData::Created, + } + } +} + +impl ResumeElement for ChannelEvent { + fn element(&self) -> (&channel::Id, Sequence) { + (&self.channel.id, self.sequence) } } #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] #[serde(tag = "type", rename_all = "snake_case")] pub enum ChannelEventData { + Created, Message(MessageEvent), } diff --git a/src/repo/channel.rs b/src/repo/channel.rs index d223dab..e85b898 100644 --- a/src/repo/channel.rs +++ b/src/repo/channel.rs @@ -2,7 +2,7 @@ use std::fmt; use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; -use crate::id::Id as BaseId; +use crate::{clock::DateTime, id::Id as BaseId}; pub trait Provider { fn channels(&mut self) -> Channels; @@ -20,22 +20,32 @@ pub struct Channels<'t>(&'t mut SqliteConnection); pub struct Channel { pub id: Id, pub name: String, + #[serde(skip)] + pub created_at: DateTime, } impl<'c> Channels<'c> { - pub async fn create(&mut self, name: &str) -> Result { + pub async fn create( + &mut self, + name: &str, + created_at: &DateTime, + ) -> Result { let id = Id::generate(); let channel = sqlx::query_as!( Channel, r#" insert - into channel (id, name) - values ($1, $2) - returning id as "id: Id", name + into channel (id, name, created_at) + values ($1, $2, $3) + returning + id as "id: Id", + name, + created_at as "created_at: DateTime" "#, id, name, + created_at, ) .fetch_one(&mut *self.0) .await?; @@ -47,7 +57,10 @@ impl<'c> Channels<'c> { let channel = sqlx::query_as!( Channel, r#" - select id as "id: Id", name + select + id as "id: Id", + name, + created_at as "created_at: DateTime" from channel where id = $1 "#, @@ -64,8 +77,9 @@ impl<'c> Channels<'c> { Channel, r#" select - channel.id as "id: Id", - channel.name + id as "id: Id", + name, + created_at as "created_at: DateTime" from channel order by channel.name "#, diff --git a/src/test/fixtures/channel.rs b/src/test/fixtures/channel.rs index 0558395..8744470 100644 --- a/src/test/fixtures/channel.rs +++ b/src/test/fixtures/channel.rs @@ -4,12 +4,12 @@ use faker_rand::{ }; use rand; -use crate::{app::App, repo::channel::Channel}; +use crate::{app::App, clock::RequestedAt, repo::channel::Channel}; -pub async fn create(app: &App) -> Channel { +pub async fn create(app: &App, created_at: &RequestedAt) -> Channel { let name = propose(); app.channels() - .create(&name) + .create(&name, created_at) .await .expect("should always succeed if the channel is actually new") } -- cgit v1.2.3 From 155f6f2556b21e6b25afe096b19adcde1255c598 Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Fri, 27 Sep 2024 23:46:55 -0400 Subject: Expire channels, too. --- ...aaf423b1fd14ed9e252d7d9c5323feafb0b9159259.json | 32 +++++++++++++ ...6f1c2744bdb7a93c39ebcf15087c89bba6be71f7cb.json | 20 ++++++++ docs/api.md | 21 ++++++++- src/channel/app.rs | 29 +++++++++++- src/channel/routes/test/on_create.rs | 10 ++-- src/cli.rs | 4 +- src/events/app.rs | 11 +++-- src/events/expire.rs | 18 ------- src/events/mod.rs | 1 - src/events/repo/message.rs | 11 ++--- src/events/types.rs | 55 ++++++++++++++++++---- src/expire.rs | 20 ++++++++ src/lib.rs | 1 + src/login/app.rs | 15 ++++-- src/login/routes/test/login.rs | 6 +++ src/repo/channel.rs | 52 +++++++++++++++++++- src/test/fixtures/filter.rs | 6 +++ 17 files changed, 262 insertions(+), 50 deletions(-) create mode 100644 .sqlx/query-6a782686e163e65f5e03e4aaf423b1fd14ed9e252d7d9c5323feafb0b9159259.json create mode 100644 .sqlx/query-d382215ac9e9d8d2c9b5eb6f1c2744bdb7a93c39ebcf15087c89bba6be71f7cb.json delete mode 100644 src/events/expire.rs create mode 100644 src/expire.rs (limited to 'src/channel/app.rs') diff --git a/.sqlx/query-6a782686e163e65f5e03e4aaf423b1fd14ed9e252d7d9c5323feafb0b9159259.json b/.sqlx/query-6a782686e163e65f5e03e4aaf423b1fd14ed9e252d7d9c5323feafb0b9159259.json new file mode 100644 index 0000000..ae298d6 --- /dev/null +++ b/.sqlx/query-6a782686e163e65f5e03e4aaf423b1fd14ed9e252d7d9c5323feafb0b9159259.json @@ -0,0 +1,32 @@ +{ + "db_name": "SQLite", + "query": "\n select\n channel.id as \"id: Id\",\n channel.name,\n channel.created_at as \"created_at: DateTime\"\n from channel\n left join message\n where created_at < $1\n and message.id is null\n ", + "describe": { + "columns": [ + { + "name": "id: Id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "name", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "created_at: DateTime", + "ordinal": 2, + "type_info": "Text" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false, + false, + false + ] + }, + "hash": "6a782686e163e65f5e03e4aaf423b1fd14ed9e252d7d9c5323feafb0b9159259" +} diff --git a/.sqlx/query-d382215ac9e9d8d2c9b5eb6f1c2744bdb7a93c39ebcf15087c89bba6be71f7cb.json b/.sqlx/query-d382215ac9e9d8d2c9b5eb6f1c2744bdb7a93c39ebcf15087c89bba6be71f7cb.json new file mode 100644 index 0000000..1d448d4 --- /dev/null +++ b/.sqlx/query-d382215ac9e9d8d2c9b5eb6f1c2744bdb7a93c39ebcf15087c89bba6be71f7cb.json @@ -0,0 +1,20 @@ +{ + "db_name": "SQLite", + "query": "\n delete from channel\n where id = $1\n returning 1 as \"row: i64\"\n ", + "describe": { + "columns": [ + { + "name": "row: i64", + "ordinal": 0, + "type_info": "Null" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + null + ] + }, + "hash": "d382215ac9e9d8d2c9b5eb6f1c2744bdb7a93c39ebcf15087c89bba6be71f7cb" +} diff --git a/docs/api.md b/docs/api.md index 9d803ad..e18c6d5 100644 --- a/docs/api.md +++ b/docs/api.md @@ -188,8 +188,27 @@ data: "id": "L1234abcd", data: "name": "example username" data: }, data: "message": { -data: "id": "Mxnjcf3y41prfry9", +data: "id": "M1312acab", data: "body": "beep" data: } data: } + +id: 1235 +data: { +data: "at": "2024-09-28T02:44:27.077355Z", +data: "channel": { +data: "id": "C9876cyyz", +data: "name": "example channel 2" +data: }, +data: "type": "message_deleted", +data: "message": "M1312acab" +data: } + +id: 1236 +data: { +data: "at": "2024-09-28T03:40:25.384318Z", +data: "type": "deleted", +data: "channel": "C9876cyyz" +data: } + ``` diff --git a/src/channel/app.rs b/src/channel/app.rs index 1eeca79..d7312e4 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -1,8 +1,9 @@ +use chrono::TimeDelta; use sqlx::sqlite::SqlitePool; use crate::{ clock::DateTime, - events::{broadcaster::Broadcaster, types::ChannelEvent}, + events::{broadcaster::Broadcaster, repo::message::Provider as _, types::ChannelEvent}, repo::channel::{Channel, Provider as _}, }; @@ -38,6 +39,32 @@ impl<'a> Channels<'a> { Ok(channels) } + + pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { + // Somewhat arbitrarily, expire after 90 days. + let expire_at = relative_to.to_owned() - TimeDelta::days(90); + + let mut tx = self.db.begin().await?; + let expired = tx.channels().expired(&expire_at).await?; + + let mut events = Vec::with_capacity(expired.len()); + for channel in expired { + let sequence = tx.message_events().assign_sequence(&channel).await?; + let event = tx + .channels() + .delete_expired(&channel, sequence, relative_to) + .await?; + events.push(event); + } + + tx.commit().await?; + + for event in events { + self.broadcaster.broadcast(&event); + } + + Ok(()) + } } #[derive(Debug, thiserror::Error)] diff --git a/src/channel/routes/test/on_create.rs b/src/channel/routes/test/on_create.rs index 5e62d7f..e2610a5 100644 --- a/src/channel/routes/test/on_create.rs +++ b/src/channel/routes/test/on_create.rs @@ -1,5 +1,5 @@ use axum::extract::{Json, State}; -use futures::{future, stream::StreamExt as _}; +use futures::stream::StreamExt as _; use crate::{ channel::{app, routes}, @@ -41,7 +41,7 @@ async fn new_channel() { .subscribe(types::ResumePoint::default()) .await .expect("subscribing never fails") - .filter(|types::ResumableEvent(_, event)| future::ready(event.channel == response_channel)); + .filter(fixtures::filter::created()); let types::ResumableEvent(_, event) = events .next() @@ -50,7 +50,11 @@ async fn new_channel() { .expect("creation event published"); assert_eq!(types::Sequence::default(), event.sequence); - assert_eq!(types::ChannelEventData::Created, event.data); + assert!(matches!( + event.data, + types::ChannelEventData::Created(event) + if event.channel == response_channel + )); } #[tokio::test] diff --git a/src/cli.rs b/src/cli.rs index 472d68f..132baf8 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -10,7 +10,7 @@ use clap::Parser; use sqlx::sqlite::SqlitePool; use tokio::net; -use crate::{app::App, channel, clock, events, login, repo::pool}; +use crate::{app::App, channel, clock, events, expire, login, repo::pool}; /// Command-line entry point for running the `hi` server. /// @@ -74,7 +74,7 @@ impl Args { let app = routers() .route_layer(middleware::from_fn_with_state( app.clone(), - events::expire::middleware, + expire::middleware, )) .route_layer(middleware::from_fn(clock::middleware)) .with_state(app); diff --git a/src/events/app.rs b/src/events/app.rs index 03f3ee6..5162c67 100644 --- a/src/events/app.rs +++ b/src/events/app.rs @@ -64,9 +64,10 @@ impl<'a> Events<'a> { let mut events = Vec::with_capacity(expired.len()); for (channel, message) in expired { + let sequence = tx.message_events().assign_sequence(&channel).await?; let event = tx .message_events() - .delete_expired(&channel, &message, relative_to) + .delete_expired(&channel, &message, sequence, relative_to) .await?; events.push(event); } @@ -134,9 +135,11 @@ impl<'a> Events<'a> { Ok(created_events.chain(replay).chain(live_messages).scan( resume_at, |resume_point, event| { - let channel = &event.channel.id; - let sequence = event.sequence; - resume_point.advance(channel, sequence); + let channel = &event.channel_id(); + match event.data { + types::ChannelEventData::Deleted(_) => resume_point.forget(channel), + _ => resume_point.advance(channel, event.sequence), + } let event = types::ResumableEvent(resume_point.clone(), event); diff --git a/src/events/expire.rs b/src/events/expire.rs deleted file mode 100644 index d92142d..0000000 --- a/src/events/expire.rs +++ /dev/null @@ -1,18 +0,0 @@ -use axum::{ - extract::{Request, State}, - middleware::Next, - response::Response, -}; - -use crate::{app::App, clock::RequestedAt, error::Internal}; - -// Expires messages and channels before each request. -pub async fn middleware( - State(app): State, - RequestedAt(expired_at): RequestedAt, - req: Request, - next: Next, -) -> Result { - app.events().expire(&expired_at).await?; - Ok(next.run(req).await) -} diff --git a/src/events/mod.rs b/src/events/mod.rs index 86bc5e9..711ae64 100644 --- a/src/events/mod.rs +++ b/src/events/mod.rs @@ -1,6 +1,5 @@ pub mod app; pub mod broadcaster; -pub mod expire; mod extract; pub mod repo; mod routes; diff --git a/src/events/repo/message.rs b/src/events/repo/message.rs index 32419d5..f8bae2b 100644 --- a/src/events/repo/message.rs +++ b/src/events/repo/message.rs @@ -56,8 +56,8 @@ impl<'c> Events<'c> { .map(|row| types::ChannelEvent { sequence: row.sequence, at: row.sent_at, - channel: channel.clone(), data: types::MessageEvent { + channel: channel.clone(), sender: sender.clone(), message: Message { id: row.id, @@ -72,7 +72,7 @@ impl<'c> Events<'c> { Ok(message) } - async fn assign_sequence(&mut self, channel: &Channel) -> Result { + pub async fn assign_sequence(&mut self, channel: &Channel) -> Result { let next = sqlx::query_scalar!( r#" update channel @@ -92,10 +92,9 @@ impl<'c> Events<'c> { &mut self, channel: &Channel, message: &message::Id, + sequence: Sequence, deleted_at: &DateTime, ) -> Result { - let sequence = self.assign_sequence(channel).await?; - sqlx::query_scalar!( r#" delete from message @@ -110,8 +109,8 @@ impl<'c> Events<'c> { Ok(types::ChannelEvent { sequence, at: *deleted_at, - channel: channel.clone(), data: types::MessageDeletedEvent { + channel: channel.clone(), message: message.clone(), } .into(), @@ -178,8 +177,8 @@ impl<'c> Events<'c> { .map(|row| types::ChannelEvent { sequence: row.sequence, at: row.sent_at, - channel: channel.clone(), data: types::MessageEvent { + channel: channel.clone(), sender: login::Login { id: row.sender_id, name: row.sender_name, diff --git a/src/events/types.rs b/src/events/types.rs index 9a65207..966842d 100644 --- a/src/events/types.rs +++ b/src/events/types.rs @@ -56,6 +56,11 @@ impl ResumePoint { elements.insert(channel.clone(), sequence); } + pub fn forget(&mut self, channel: &channel::Id) { + let Self(elements) = self; + elements.remove(channel); + } + pub fn get(&self, channel: &channel::Id) -> Option { let Self(elements) = self; elements.get(channel).copied() @@ -92,7 +97,6 @@ pub struct ChannelEvent { #[serde(skip)] pub sequence: Sequence, pub at: DateTime, - pub channel: Channel, #[serde(flatten)] pub data: ChannelEventData, } @@ -102,45 +106,78 @@ impl ChannelEvent { Self { at: channel.created_at, sequence: Sequence::default(), - channel, - data: ChannelEventData::Created, + data: CreatedEvent { channel }.into(), + } + } + + pub fn channel_id(&self) -> &channel::Id { + match &self.data { + ChannelEventData::Created(event) => &event.channel.id, + ChannelEventData::Message(event) => &event.channel.id, + ChannelEventData::MessageDeleted(event) => &event.channel.id, + ChannelEventData::Deleted(event) => &event.channel, } } } impl ResumeElement for ChannelEvent { fn element(&self) -> (&channel::Id, Sequence) { - (&self.channel.id, self.sequence) + (self.channel_id(), self.sequence) } } #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] #[serde(tag = "type", rename_all = "snake_case")] pub enum ChannelEventData { - Created, + Created(CreatedEvent), Message(MessageEvent), MessageDeleted(MessageDeletedEvent), + Deleted(DeletedEvent), +} + +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] +pub struct CreatedEvent { + pub channel: Channel, +} + +impl From for ChannelEventData { + fn from(event: CreatedEvent) -> Self { + Self::Created(event) + } } #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] pub struct MessageEvent { + pub channel: Channel, pub sender: Login, pub message: message::Message, } impl From for ChannelEventData { - fn from(message: MessageEvent) -> Self { - Self::Message(message) + fn from(event: MessageEvent) -> Self { + Self::Message(event) } } #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] pub struct MessageDeletedEvent { + pub channel: Channel, pub message: message::Id, } impl From for ChannelEventData { - fn from(message: MessageDeletedEvent) -> Self { - Self::MessageDeleted(message) + fn from(event: MessageDeletedEvent) -> Self { + Self::MessageDeleted(event) + } +} + +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] +pub struct DeletedEvent { + pub channel: channel::Id, +} + +impl From for ChannelEventData { + fn from(event: DeletedEvent) -> Self { + Self::Deleted(event) } } diff --git a/src/expire.rs b/src/expire.rs new file mode 100644 index 0000000..16006d1 --- /dev/null +++ b/src/expire.rs @@ -0,0 +1,20 @@ +use axum::{ + extract::{Request, State}, + middleware::Next, + response::Response, +}; + +use crate::{app::App, clock::RequestedAt, error::Internal}; + +// Expires messages and channels before each request. +pub async fn middleware( + State(app): State, + RequestedAt(expired_at): RequestedAt, + req: Request, + next: Next, +) -> Result { + app.logins().expire(&expired_at).await?; + app.events().expire(&expired_at).await?; + app.channels().expire(&expired_at).await?; + Ok(next.run(req).await) +} diff --git a/src/lib.rs b/src/lib.rs index f731e57..4139d4d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,6 +8,7 @@ pub mod cli; mod clock; mod error; mod events; +mod expire; mod id; mod login; mod password; diff --git a/src/login/app.rs b/src/login/app.rs index 10609c6..292b95f 100644 --- a/src/login/app.rs +++ b/src/login/app.rs @@ -60,11 +60,7 @@ impl<'a> Logins<'a> { } pub async fn validate(&self, secret: &str, used_at: &DateTime) -> Result { - // Somewhat arbitrarily, expire after 7 days. - let expire_at = used_at.to_owned() - TimeDelta::days(7); - let mut tx = self.db.begin().await?; - tx.tokens().expire(&expire_at).await?; let login = tx .tokens() .validate(secret, used_at) @@ -75,6 +71,17 @@ impl<'a> Logins<'a> { Ok(login) } + pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { + // Somewhat arbitrarily, expire after 7 days. + let expire_at = relative_to.to_owned() - TimeDelta::days(7); + + let mut tx = self.db.begin().await?; + tx.tokens().expire(&expire_at).await?; + tx.commit().await?; + + Ok(()) + } + pub async fn logout(&self, secret: &str) -> Result<(), ValidateError> { let mut tx = self.db.begin().await?; tx.tokens() diff --git a/src/login/routes/test/login.rs b/src/login/routes/test/login.rs index d92c01b..719ccca 100644 --- a/src/login/routes/test/login.rs +++ b/src/login/routes/test/login.rs @@ -126,6 +126,12 @@ async fn token_expires() { // Verify the semantics + let expired_at = fixtures::now(); + app.logins() + .expire(&expired_at) + .await + .expect("expiring tokens never fails"); + let verified_at = fixtures::now(); let error = app .logins() diff --git a/src/repo/channel.rs b/src/repo/channel.rs index 6514426..3c7468f 100644 --- a/src/repo/channel.rs +++ b/src/repo/channel.rs @@ -2,7 +2,11 @@ use std::fmt; use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; -use crate::{clock::DateTime, events::types::Sequence, id::Id as BaseId}; +use crate::{ + clock::DateTime, + events::types::{self, Sequence}, + id::Id as BaseId, +}; pub trait Provider { fn channels(&mut self) -> Channels; @@ -91,6 +95,52 @@ impl<'c> Channels<'c> { Ok(channels) } + + pub async fn delete_expired( + &mut self, + channel: &Channel, + sequence: Sequence, + deleted_at: &DateTime, + ) -> Result { + let channel = channel.id.clone(); + sqlx::query_scalar!( + r#" + delete from channel + where id = $1 + returning 1 as "row: i64" + "#, + channel, + ) + .fetch_one(&mut *self.0) + .await?; + + Ok(types::ChannelEvent { + sequence, + at: *deleted_at, + data: types::DeletedEvent { channel }.into(), + }) + } + + pub async fn expired(&mut self, expired_at: &DateTime) -> Result, sqlx::Error> { + let channels = sqlx::query_as!( + Channel, + r#" + select + channel.id as "id: Id", + channel.name, + channel.created_at as "created_at: DateTime" + from channel + left join message + where created_at < $1 + and message.id is null + "#, + expired_at, + ) + .fetch_all(&mut *self.0) + .await?; + + Ok(channels) + } } // Stable identifier for a [Channel]. Prefixed with `C`. diff --git a/src/test/fixtures/filter.rs b/src/test/fixtures/filter.rs index 8847e13..fbebced 100644 --- a/src/test/fixtures/filter.rs +++ b/src/test/fixtures/filter.rs @@ -7,3 +7,9 @@ pub fn messages() -> impl FnMut(&types::ResumableEvent) -> future::Ready { future::ready(matches!(event.data, types::ChannelEventData::Message(_))) } } + +pub fn created() -> impl FnMut(&types::ResumableEvent) -> future::Ready { + |types::ResumableEvent(_, event)| { + future::ready(matches!(event.data, types::ChannelEventData::Created(_))) + } +} -- cgit v1.2.3