From 60b711c844f8624348d5d1dac3a625532a8e2a82 Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Fri, 27 Sep 2024 23:03:46 -0400 Subject: Delete expired messages out of band. Trying to reliably do expiry mid-request was causing some anomalies: * Creating a channel with a dup name would fail, then succeed after listing channels. It was very hard to reason about which operations needed to trigger expiry, to fix this "correctly," so now expiry runs on every request. --- src/test/fixtures/filter.rs | 9 +++++++++ 1 file changed, 9 insertions(+) create mode 100644 src/test/fixtures/filter.rs (limited to 'src/test/fixtures/filter.rs') diff --git a/src/test/fixtures/filter.rs b/src/test/fixtures/filter.rs new file mode 100644 index 0000000..8847e13 --- /dev/null +++ b/src/test/fixtures/filter.rs @@ -0,0 +1,9 @@ +use futures::future; + +use crate::events::types; + +pub fn messages() -> impl FnMut(&types::ResumableEvent) -> future::Ready { + |types::ResumableEvent(_, event)| { + future::ready(matches!(event.data, types::ChannelEventData::Message(_))) + } +} -- cgit v1.2.3 From 155f6f2556b21e6b25afe096b19adcde1255c598 Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Fri, 27 Sep 2024 23:46:55 -0400 Subject: Expire channels, too. --- ...aaf423b1fd14ed9e252d7d9c5323feafb0b9159259.json | 32 +++++++++++++ ...6f1c2744bdb7a93c39ebcf15087c89bba6be71f7cb.json | 20 ++++++++ docs/api.md | 21 ++++++++- src/channel/app.rs | 29 +++++++++++- src/channel/routes/test/on_create.rs | 10 ++-- src/cli.rs | 4 +- src/events/app.rs | 11 +++-- src/events/expire.rs | 18 ------- src/events/mod.rs | 1 - src/events/repo/message.rs | 11 ++--- src/events/types.rs | 55 ++++++++++++++++++---- src/expire.rs | 20 ++++++++ src/lib.rs | 1 + src/login/app.rs | 15 ++++-- src/login/routes/test/login.rs | 6 +++ src/repo/channel.rs | 52 +++++++++++++++++++- src/test/fixtures/filter.rs | 6 +++ 17 files changed, 262 insertions(+), 50 deletions(-) create mode 100644 .sqlx/query-6a782686e163e65f5e03e4aaf423b1fd14ed9e252d7d9c5323feafb0b9159259.json create mode 100644 .sqlx/query-d382215ac9e9d8d2c9b5eb6f1c2744bdb7a93c39ebcf15087c89bba6be71f7cb.json delete mode 100644 src/events/expire.rs create mode 100644 src/expire.rs (limited to 'src/test/fixtures/filter.rs') diff --git a/.sqlx/query-6a782686e163e65f5e03e4aaf423b1fd14ed9e252d7d9c5323feafb0b9159259.json b/.sqlx/query-6a782686e163e65f5e03e4aaf423b1fd14ed9e252d7d9c5323feafb0b9159259.json new file mode 100644 index 0000000..ae298d6 --- /dev/null +++ b/.sqlx/query-6a782686e163e65f5e03e4aaf423b1fd14ed9e252d7d9c5323feafb0b9159259.json @@ -0,0 +1,32 @@ +{ + "db_name": "SQLite", + "query": "\n select\n channel.id as \"id: Id\",\n channel.name,\n channel.created_at as \"created_at: DateTime\"\n from channel\n left join message\n where created_at < $1\n and message.id is null\n ", + "describe": { + "columns": [ + { + "name": "id: Id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "name", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "created_at: DateTime", + "ordinal": 2, + "type_info": "Text" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false, + false, + false + ] + }, + "hash": "6a782686e163e65f5e03e4aaf423b1fd14ed9e252d7d9c5323feafb0b9159259" +} diff --git a/.sqlx/query-d382215ac9e9d8d2c9b5eb6f1c2744bdb7a93c39ebcf15087c89bba6be71f7cb.json b/.sqlx/query-d382215ac9e9d8d2c9b5eb6f1c2744bdb7a93c39ebcf15087c89bba6be71f7cb.json new file mode 100644 index 0000000..1d448d4 --- /dev/null +++ b/.sqlx/query-d382215ac9e9d8d2c9b5eb6f1c2744bdb7a93c39ebcf15087c89bba6be71f7cb.json @@ -0,0 +1,20 @@ +{ + "db_name": "SQLite", + "query": "\n delete from channel\n where id = $1\n returning 1 as \"row: i64\"\n ", + "describe": { + "columns": [ + { + "name": "row: i64", + "ordinal": 0, + "type_info": "Null" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + null + ] + }, + "hash": "d382215ac9e9d8d2c9b5eb6f1c2744bdb7a93c39ebcf15087c89bba6be71f7cb" +} diff --git a/docs/api.md b/docs/api.md index 9d803ad..e18c6d5 100644 --- a/docs/api.md +++ b/docs/api.md @@ -188,8 +188,27 @@ data: "id": "L1234abcd", data: "name": "example username" data: }, data: "message": { -data: "id": "Mxnjcf3y41prfry9", +data: "id": "M1312acab", data: "body": "beep" data: } data: } + +id: 1235 +data: { +data: "at": "2024-09-28T02:44:27.077355Z", +data: "channel": { +data: "id": "C9876cyyz", +data: "name": "example channel 2" +data: }, +data: "type": "message_deleted", +data: "message": "M1312acab" +data: } + +id: 1236 +data: { +data: "at": "2024-09-28T03:40:25.384318Z", +data: "type": "deleted", +data: "channel": "C9876cyyz" +data: } + ``` diff --git a/src/channel/app.rs b/src/channel/app.rs index 1eeca79..d7312e4 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -1,8 +1,9 @@ +use chrono::TimeDelta; use sqlx::sqlite::SqlitePool; use crate::{ clock::DateTime, - events::{broadcaster::Broadcaster, types::ChannelEvent}, + events::{broadcaster::Broadcaster, repo::message::Provider as _, types::ChannelEvent}, repo::channel::{Channel, Provider as _}, }; @@ -38,6 +39,32 @@ impl<'a> Channels<'a> { Ok(channels) } + + pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { + // Somewhat arbitrarily, expire after 90 days. + let expire_at = relative_to.to_owned() - TimeDelta::days(90); + + let mut tx = self.db.begin().await?; + let expired = tx.channels().expired(&expire_at).await?; + + let mut events = Vec::with_capacity(expired.len()); + for channel in expired { + let sequence = tx.message_events().assign_sequence(&channel).await?; + let event = tx + .channels() + .delete_expired(&channel, sequence, relative_to) + .await?; + events.push(event); + } + + tx.commit().await?; + + for event in events { + self.broadcaster.broadcast(&event); + } + + Ok(()) + } } #[derive(Debug, thiserror::Error)] diff --git a/src/channel/routes/test/on_create.rs b/src/channel/routes/test/on_create.rs index 5e62d7f..e2610a5 100644 --- a/src/channel/routes/test/on_create.rs +++ b/src/channel/routes/test/on_create.rs @@ -1,5 +1,5 @@ use axum::extract::{Json, State}; -use futures::{future, stream::StreamExt as _}; +use futures::stream::StreamExt as _; use crate::{ channel::{app, routes}, @@ -41,7 +41,7 @@ async fn new_channel() { .subscribe(types::ResumePoint::default()) .await .expect("subscribing never fails") - .filter(|types::ResumableEvent(_, event)| future::ready(event.channel == response_channel)); + .filter(fixtures::filter::created()); let types::ResumableEvent(_, event) = events .next() @@ -50,7 +50,11 @@ async fn new_channel() { .expect("creation event published"); assert_eq!(types::Sequence::default(), event.sequence); - assert_eq!(types::ChannelEventData::Created, event.data); + assert!(matches!( + event.data, + types::ChannelEventData::Created(event) + if event.channel == response_channel + )); } #[tokio::test] diff --git a/src/cli.rs b/src/cli.rs index 472d68f..132baf8 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -10,7 +10,7 @@ use clap::Parser; use sqlx::sqlite::SqlitePool; use tokio::net; -use crate::{app::App, channel, clock, events, login, repo::pool}; +use crate::{app::App, channel, clock, events, expire, login, repo::pool}; /// Command-line entry point for running the `hi` server. /// @@ -74,7 +74,7 @@ impl Args { let app = routers() .route_layer(middleware::from_fn_with_state( app.clone(), - events::expire::middleware, + expire::middleware, )) .route_layer(middleware::from_fn(clock::middleware)) .with_state(app); diff --git a/src/events/app.rs b/src/events/app.rs index 03f3ee6..5162c67 100644 --- a/src/events/app.rs +++ b/src/events/app.rs @@ -64,9 +64,10 @@ impl<'a> Events<'a> { let mut events = Vec::with_capacity(expired.len()); for (channel, message) in expired { + let sequence = tx.message_events().assign_sequence(&channel).await?; let event = tx .message_events() - .delete_expired(&channel, &message, relative_to) + .delete_expired(&channel, &message, sequence, relative_to) .await?; events.push(event); } @@ -134,9 +135,11 @@ impl<'a> Events<'a> { Ok(created_events.chain(replay).chain(live_messages).scan( resume_at, |resume_point, event| { - let channel = &event.channel.id; - let sequence = event.sequence; - resume_point.advance(channel, sequence); + let channel = &event.channel_id(); + match event.data { + types::ChannelEventData::Deleted(_) => resume_point.forget(channel), + _ => resume_point.advance(channel, event.sequence), + } let event = types::ResumableEvent(resume_point.clone(), event); diff --git a/src/events/expire.rs b/src/events/expire.rs deleted file mode 100644 index d92142d..0000000 --- a/src/events/expire.rs +++ /dev/null @@ -1,18 +0,0 @@ -use axum::{ - extract::{Request, State}, - middleware::Next, - response::Response, -}; - -use crate::{app::App, clock::RequestedAt, error::Internal}; - -// Expires messages and channels before each request. -pub async fn middleware( - State(app): State, - RequestedAt(expired_at): RequestedAt, - req: Request, - next: Next, -) -> Result { - app.events().expire(&expired_at).await?; - Ok(next.run(req).await) -} diff --git a/src/events/mod.rs b/src/events/mod.rs index 86bc5e9..711ae64 100644 --- a/src/events/mod.rs +++ b/src/events/mod.rs @@ -1,6 +1,5 @@ pub mod app; pub mod broadcaster; -pub mod expire; mod extract; pub mod repo; mod routes; diff --git a/src/events/repo/message.rs b/src/events/repo/message.rs index 32419d5..f8bae2b 100644 --- a/src/events/repo/message.rs +++ b/src/events/repo/message.rs @@ -56,8 +56,8 @@ impl<'c> Events<'c> { .map(|row| types::ChannelEvent { sequence: row.sequence, at: row.sent_at, - channel: channel.clone(), data: types::MessageEvent { + channel: channel.clone(), sender: sender.clone(), message: Message { id: row.id, @@ -72,7 +72,7 @@ impl<'c> Events<'c> { Ok(message) } - async fn assign_sequence(&mut self, channel: &Channel) -> Result { + pub async fn assign_sequence(&mut self, channel: &Channel) -> Result { let next = sqlx::query_scalar!( r#" update channel @@ -92,10 +92,9 @@ impl<'c> Events<'c> { &mut self, channel: &Channel, message: &message::Id, + sequence: Sequence, deleted_at: &DateTime, ) -> Result { - let sequence = self.assign_sequence(channel).await?; - sqlx::query_scalar!( r#" delete from message @@ -110,8 +109,8 @@ impl<'c> Events<'c> { Ok(types::ChannelEvent { sequence, at: *deleted_at, - channel: channel.clone(), data: types::MessageDeletedEvent { + channel: channel.clone(), message: message.clone(), } .into(), @@ -178,8 +177,8 @@ impl<'c> Events<'c> { .map(|row| types::ChannelEvent { sequence: row.sequence, at: row.sent_at, - channel: channel.clone(), data: types::MessageEvent { + channel: channel.clone(), sender: login::Login { id: row.sender_id, name: row.sender_name, diff --git a/src/events/types.rs b/src/events/types.rs index 9a65207..966842d 100644 --- a/src/events/types.rs +++ b/src/events/types.rs @@ -56,6 +56,11 @@ impl ResumePoint { elements.insert(channel.clone(), sequence); } + pub fn forget(&mut self, channel: &channel::Id) { + let Self(elements) = self; + elements.remove(channel); + } + pub fn get(&self, channel: &channel::Id) -> Option { let Self(elements) = self; elements.get(channel).copied() @@ -92,7 +97,6 @@ pub struct ChannelEvent { #[serde(skip)] pub sequence: Sequence, pub at: DateTime, - pub channel: Channel, #[serde(flatten)] pub data: ChannelEventData, } @@ -102,45 +106,78 @@ impl ChannelEvent { Self { at: channel.created_at, sequence: Sequence::default(), - channel, - data: ChannelEventData::Created, + data: CreatedEvent { channel }.into(), + } + } + + pub fn channel_id(&self) -> &channel::Id { + match &self.data { + ChannelEventData::Created(event) => &event.channel.id, + ChannelEventData::Message(event) => &event.channel.id, + ChannelEventData::MessageDeleted(event) => &event.channel.id, + ChannelEventData::Deleted(event) => &event.channel, } } } impl ResumeElement for ChannelEvent { fn element(&self) -> (&channel::Id, Sequence) { - (&self.channel.id, self.sequence) + (self.channel_id(), self.sequence) } } #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] #[serde(tag = "type", rename_all = "snake_case")] pub enum ChannelEventData { - Created, + Created(CreatedEvent), Message(MessageEvent), MessageDeleted(MessageDeletedEvent), + Deleted(DeletedEvent), +} + +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] +pub struct CreatedEvent { + pub channel: Channel, +} + +impl From for ChannelEventData { + fn from(event: CreatedEvent) -> Self { + Self::Created(event) + } } #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] pub struct MessageEvent { + pub channel: Channel, pub sender: Login, pub message: message::Message, } impl From for ChannelEventData { - fn from(message: MessageEvent) -> Self { - Self::Message(message) + fn from(event: MessageEvent) -> Self { + Self::Message(event) } } #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] pub struct MessageDeletedEvent { + pub channel: Channel, pub message: message::Id, } impl From for ChannelEventData { - fn from(message: MessageDeletedEvent) -> Self { - Self::MessageDeleted(message) + fn from(event: MessageDeletedEvent) -> Self { + Self::MessageDeleted(event) + } +} + +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] +pub struct DeletedEvent { + pub channel: channel::Id, +} + +impl From for ChannelEventData { + fn from(event: DeletedEvent) -> Self { + Self::Deleted(event) } } diff --git a/src/expire.rs b/src/expire.rs new file mode 100644 index 0000000..16006d1 --- /dev/null +++ b/src/expire.rs @@ -0,0 +1,20 @@ +use axum::{ + extract::{Request, State}, + middleware::Next, + response::Response, +}; + +use crate::{app::App, clock::RequestedAt, error::Internal}; + +// Expires messages and channels before each request. +pub async fn middleware( + State(app): State, + RequestedAt(expired_at): RequestedAt, + req: Request, + next: Next, +) -> Result { + app.logins().expire(&expired_at).await?; + app.events().expire(&expired_at).await?; + app.channels().expire(&expired_at).await?; + Ok(next.run(req).await) +} diff --git a/src/lib.rs b/src/lib.rs index f731e57..4139d4d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,6 +8,7 @@ pub mod cli; mod clock; mod error; mod events; +mod expire; mod id; mod login; mod password; diff --git a/src/login/app.rs b/src/login/app.rs index 10609c6..292b95f 100644 --- a/src/login/app.rs +++ b/src/login/app.rs @@ -60,11 +60,7 @@ impl<'a> Logins<'a> { } pub async fn validate(&self, secret: &str, used_at: &DateTime) -> Result { - // Somewhat arbitrarily, expire after 7 days. - let expire_at = used_at.to_owned() - TimeDelta::days(7); - let mut tx = self.db.begin().await?; - tx.tokens().expire(&expire_at).await?; let login = tx .tokens() .validate(secret, used_at) @@ -75,6 +71,17 @@ impl<'a> Logins<'a> { Ok(login) } + pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { + // Somewhat arbitrarily, expire after 7 days. + let expire_at = relative_to.to_owned() - TimeDelta::days(7); + + let mut tx = self.db.begin().await?; + tx.tokens().expire(&expire_at).await?; + tx.commit().await?; + + Ok(()) + } + pub async fn logout(&self, secret: &str) -> Result<(), ValidateError> { let mut tx = self.db.begin().await?; tx.tokens() diff --git a/src/login/routes/test/login.rs b/src/login/routes/test/login.rs index d92c01b..719ccca 100644 --- a/src/login/routes/test/login.rs +++ b/src/login/routes/test/login.rs @@ -126,6 +126,12 @@ async fn token_expires() { // Verify the semantics + let expired_at = fixtures::now(); + app.logins() + .expire(&expired_at) + .await + .expect("expiring tokens never fails"); + let verified_at = fixtures::now(); let error = app .logins() diff --git a/src/repo/channel.rs b/src/repo/channel.rs index 6514426..3c7468f 100644 --- a/src/repo/channel.rs +++ b/src/repo/channel.rs @@ -2,7 +2,11 @@ use std::fmt; use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; -use crate::{clock::DateTime, events::types::Sequence, id::Id as BaseId}; +use crate::{ + clock::DateTime, + events::types::{self, Sequence}, + id::Id as BaseId, +}; pub trait Provider { fn channels(&mut self) -> Channels; @@ -91,6 +95,52 @@ impl<'c> Channels<'c> { Ok(channels) } + + pub async fn delete_expired( + &mut self, + channel: &Channel, + sequence: Sequence, + deleted_at: &DateTime, + ) -> Result { + let channel = channel.id.clone(); + sqlx::query_scalar!( + r#" + delete from channel + where id = $1 + returning 1 as "row: i64" + "#, + channel, + ) + .fetch_one(&mut *self.0) + .await?; + + Ok(types::ChannelEvent { + sequence, + at: *deleted_at, + data: types::DeletedEvent { channel }.into(), + }) + } + + pub async fn expired(&mut self, expired_at: &DateTime) -> Result, sqlx::Error> { + let channels = sqlx::query_as!( + Channel, + r#" + select + channel.id as "id: Id", + channel.name, + channel.created_at as "created_at: DateTime" + from channel + left join message + where created_at < $1 + and message.id is null + "#, + expired_at, + ) + .fetch_all(&mut *self.0) + .await?; + + Ok(channels) + } } // Stable identifier for a [Channel]. Prefixed with `C`. diff --git a/src/test/fixtures/filter.rs b/src/test/fixtures/filter.rs index 8847e13..fbebced 100644 --- a/src/test/fixtures/filter.rs +++ b/src/test/fixtures/filter.rs @@ -7,3 +7,9 @@ pub fn messages() -> impl FnMut(&types::ResumableEvent) -> future::Ready { future::ready(matches!(event.data, types::ChannelEventData::Message(_))) } } + +pub fn created() -> impl FnMut(&types::ResumableEvent) -> future::Ready { + |types::ResumableEvent(_, event)| { + future::ready(matches!(event.data, types::ChannelEventData::Created(_))) + } +} -- cgit v1.2.3