diff options
| -rw-r--r-- | .sqlx/query-61ebab676719aed700404185eb71f113b18d581b50d2c4c7d0462438a71e5f1d.json | 12 | ||||
| -rw-r--r-- | .sqlx/query-aeafe536f36593bfd1080ee61c4b10c6f90b1221e963db69c8e6d23e99012ecf.json (renamed from .sqlx/query-bddcc014038cc8611bffcbb3ef57ab9a8bc90dd1104e9900062dccc708e9bd33.json) | 6 | ||||
| -rw-r--r-- | .sqlx/query-df3656771c3cb6851e0c54a2d368676f279af866d0840d6c2c093b87b1eadd8c.json | 38 | ||||
| -rw-r--r-- | .sqlx/query-f5d5b3ec3554a80230e29676cdd9450fd1e8b4f2425cfda141d72fd94d3c39f9.json | 20 | ||||
| -rw-r--r-- | src/channel/routes/test/on_create.rs | 2 | ||||
| -rw-r--r-- | src/channel/routes/test/on_send.rs | 4 | ||||
| -rw-r--r-- | src/cli.rs | 4 | ||||
| -rw-r--r-- | src/events/app.rs | 40 | ||||
| -rw-r--r-- | src/events/expire.rs | 18 | ||||
| -rw-r--r-- | src/events/mod.rs | 1 | ||||
| -rw-r--r-- | src/events/repo/message.rs | 70 | ||||
| -rw-r--r-- | src/events/routes.rs | 5 | ||||
| -rw-r--r-- | src/events/routes/test.rs | 91 | ||||
| -rw-r--r-- | src/events/types.rs | 12 | ||||
| -rw-r--r-- | src/repo/channel.rs | 8 | ||||
| -rw-r--r-- | src/test/fixtures/filter.rs | 9 | ||||
| -rw-r--r-- | src/test/fixtures/mod.rs | 1 |
17 files changed, 229 insertions, 112 deletions
diff --git a/.sqlx/query-61ebab676719aed700404185eb71f113b18d581b50d2c4c7d0462438a71e5f1d.json b/.sqlx/query-61ebab676719aed700404185eb71f113b18d581b50d2c4c7d0462438a71e5f1d.json deleted file mode 100644 index 9edc1af..0000000 --- a/.sqlx/query-61ebab676719aed700404185eb71f113b18d581b50d2c4c7d0462438a71e5f1d.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n delete from message\n where sent_at < $1\n ", - "describe": { - "columns": [], - "parameters": { - "Right": 1 - }, - "nullable": [] - }, - "hash": "61ebab676719aed700404185eb71f113b18d581b50d2c4c7d0462438a71e5f1d" -} diff --git a/.sqlx/query-bddcc014038cc8611bffcbb3ef57ab9a8bc90dd1104e9900062dccc708e9bd33.json b/.sqlx/query-aeafe536f36593bfd1080ee61c4b10c6f90b1221e963db69c8e6d23e99012ecf.json index 64d56dd..5c27826 100644 --- a/.sqlx/query-bddcc014038cc8611bffcbb3ef57ab9a8bc90dd1104e9900062dccc708e9bd33.json +++ b/.sqlx/query-aeafe536f36593bfd1080ee61c4b10c6f90b1221e963db69c8e6d23e99012ecf.json @@ -1,6 +1,6 @@ { "db_name": "SQLite", - "query": "\n insert\n into channel (id, name, created_at)\n values ($1, $2, $3)\n returning\n id as \"id: Id\",\n name,\n created_at as \"created_at: DateTime\"\n ", + "query": "\n insert\n into channel (id, name, created_at, last_sequence)\n values ($1, $2, $3, $4)\n returning\n id as \"id: Id\",\n name,\n created_at as \"created_at: DateTime\"\n ", "describe": { "columns": [ { @@ -20,7 +20,7 @@ } ], "parameters": { - "Right": 3 + "Right": 4 }, "nullable": [ false, @@ -28,5 +28,5 @@ false ] }, - "hash": "bddcc014038cc8611bffcbb3ef57ab9a8bc90dd1104e9900062dccc708e9bd33" + "hash": "aeafe536f36593bfd1080ee61c4b10c6f90b1221e963db69c8e6d23e99012ecf" } diff --git a/.sqlx/query-df3656771c3cb6851e0c54a2d368676f279af866d0840d6c2c093b87b1eadd8c.json b/.sqlx/query-df3656771c3cb6851e0c54a2d368676f279af866d0840d6c2c093b87b1eadd8c.json new file mode 100644 index 0000000..87e478e --- /dev/null +++ b/.sqlx/query-df3656771c3cb6851e0c54a2d368676f279af866d0840d6c2c093b87b1eadd8c.json @@ -0,0 +1,38 @@ +{ + "db_name": "SQLite", + "query": "\n select\n channel.id as \"channel_id: channel::Id\",\n channel.name as \"channel_name\",\n channel.created_at as \"channel_created_at: DateTime\",\n message.id as \"message: message::Id\"\n from message\n join channel on message.channel = channel.id\n join login as sender on message.sender = sender.id\n where sent_at < $1\n ", + "describe": { + "columns": [ + { + "name": "channel_id: channel::Id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "channel_name", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "channel_created_at: DateTime", + "ordinal": 2, + "type_info": "Text" + }, + { + "name": "message: message::Id", + "ordinal": 3, + "type_info": "Text" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false, + false, + false, + false + ] + }, + "hash": "df3656771c3cb6851e0c54a2d368676f279af866d0840d6c2c093b87b1eadd8c" +} diff --git a/.sqlx/query-f5d5b3ec3554a80230e29676cdd9450fd1e8b4f2425cfda141d72fd94d3c39f9.json b/.sqlx/query-f5d5b3ec3554a80230e29676cdd9450fd1e8b4f2425cfda141d72fd94d3c39f9.json new file mode 100644 index 0000000..7b1d2d8 --- /dev/null +++ b/.sqlx/query-f5d5b3ec3554a80230e29676cdd9450fd1e8b4f2425cfda141d72fd94d3c39f9.json @@ -0,0 +1,20 @@ +{ + "db_name": "SQLite", + "query": "\n delete from message\n where id = $1\n returning 1 as \"row: i64\"\n ", + "describe": { + "columns": [ + { + "name": "row: i64", + "ordinal": 0, + "type_info": "Null" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + null + ] + }, + "hash": "f5d5b3ec3554a80230e29676cdd9450fd1e8b4f2425cfda141d72fd94d3c39f9" +} diff --git a/src/channel/routes/test/on_create.rs b/src/channel/routes/test/on_create.rs index bb6697f..5e62d7f 100644 --- a/src/channel/routes/test/on_create.rs +++ b/src/channel/routes/test/on_create.rs @@ -38,7 +38,7 @@ async fn new_channel() { let mut events = app .events() - .subscribe(&fixtures::now(), types::ResumePoint::default()) + .subscribe(types::ResumePoint::default()) .await .expect("subscribing never fails") .filter(|types::ResumableEvent(_, event)| future::ready(event.channel == response_channel)); diff --git a/src/channel/routes/test/on_send.rs b/src/channel/routes/test/on_send.rs index 20ae016..233518b 100644 --- a/src/channel/routes/test/on_send.rs +++ b/src/channel/routes/test/on_send.rs @@ -41,12 +41,12 @@ async fn messages_in_order() { // Verify the semantics - let subscribed_at = fixtures::now(); let events = app .events() - .subscribe(&subscribed_at, types::ResumePoint::default()) + .subscribe(types::ResumePoint::default()) .await .expect("subscribing to a valid channel") + .filter(fixtures::filter::messages()) .take(requests.len()); let events = events.collect::<Vec<_>>().immediately().await; @@ -72,6 +72,10 @@ impl Args { let app = App::from(pool); let app = routers() + .route_layer(middleware::from_fn_with_state( + app.clone(), + events::expire::middleware, + )) .route_layer(middleware::from_fn(clock::middleware)) .with_state(app); diff --git a/src/events/app.rs b/src/events/app.rs index 134e86a..03f3ee6 100644 --- a/src/events/app.rs +++ b/src/events/app.rs @@ -55,14 +55,35 @@ impl<'a> Events<'a> { Ok(event) } + pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { + // Somewhat arbitrarily, expire after 90 days. + let expire_at = relative_to.to_owned() - TimeDelta::days(90); + + let mut tx = self.db.begin().await?; + let expired = tx.message_events().expired(&expire_at).await?; + + let mut events = Vec::with_capacity(expired.len()); + for (channel, message) in expired { + let event = tx + .message_events() + .delete_expired(&channel, &message, relative_to) + .await?; + events.push(event); + } + + tx.commit().await?; + + for event in events { + self.broadcaster.broadcast(&event); + } + + Ok(()) + } + pub async fn subscribe( &self, - subscribed_at: &DateTime, resume_at: ResumePoint, ) -> Result<impl Stream<Item = types::ResumableEvent> + std::fmt::Debug, sqlx::Error> { - // Somewhat arbitrarily, expire after 90 days. - let expire_at = subscribed_at.to_owned() - TimeDelta::days(90); - let mut tx = self.db.begin().await?; let channels = tx.channels().all().await?; @@ -81,8 +102,6 @@ impl<'a> Events<'a> { // querying the DB. We'll prune out duplicates later. let live_messages = self.broadcaster.subscribe(); - tx.message_events().expire(&expire_at).await?; - let mut replays = BTreeMap::new(); let mut resume_live_at = resume_at.clone(); for channel in channels { @@ -107,9 +126,6 @@ impl<'a> Events<'a> { // * resume is redundant with the resume_at argument to // `tx.broadcasts().replay(…)`. let live_messages = live_messages - // Sure, it's temporally improbable that we'll ever skip a message - // that's 90 days old, but there's no reason not to be thorough. - .filter(Self::skip_expired(&expire_at)) // Filtering on the broadcast resume point filters out messages // before resume_at, and filters out messages duplicated from // stored_messages. @@ -134,12 +150,6 @@ impl<'a> Events<'a> { ) -> impl for<'m> FnMut(&'m types::ChannelEvent) -> future::Ready<bool> { move |event| future::ready(resume_at.not_after(event)) } - fn skip_expired( - expire_at: &DateTime, - ) -> impl for<'m> FnMut(&'m types::ChannelEvent) -> future::Ready<bool> { - let expire_at = expire_at.to_owned(); - move |event| future::ready(expire_at < event.at) - } } #[derive(Debug, thiserror::Error)] diff --git a/src/events/expire.rs b/src/events/expire.rs new file mode 100644 index 0000000..d92142d --- /dev/null +++ b/src/events/expire.rs @@ -0,0 +1,18 @@ +use axum::{ + extract::{Request, State}, + middleware::Next, + response::Response, +}; + +use crate::{app::App, clock::RequestedAt, error::Internal}; + +// Expires messages and channels before each request. +pub async fn middleware( + State(app): State<App>, + RequestedAt(expired_at): RequestedAt, + req: Request, + next: Next, +) -> Result<Response, Internal> { + app.events().expire(&expired_at).await?; + Ok(next.run(req).await) +} diff --git a/src/events/mod.rs b/src/events/mod.rs index 711ae64..86bc5e9 100644 --- a/src/events/mod.rs +++ b/src/events/mod.rs @@ -1,5 +1,6 @@ pub mod app; pub mod broadcaster; +pub mod expire; mod extract; pub mod repo; mod routes; diff --git a/src/events/repo/message.rs b/src/events/repo/message.rs index f6fce0e..32419d5 100644 --- a/src/events/repo/message.rs +++ b/src/events/repo/message.rs @@ -6,7 +6,7 @@ use crate::{ repo::{ channel::{self, Channel}, login::{self, Login}, - message, + message::{self, Message}, }, }; @@ -30,7 +30,7 @@ impl<'c> Events<'c> { body: &str, sent_at: &DateTime, ) -> Result<types::ChannelEvent, sqlx::Error> { - let sequence = self.assign_sequence(&channel.id).await?; + let sequence = self.assign_sequence(channel).await?; let id = message::Id::generate(); @@ -59,7 +59,7 @@ impl<'c> Events<'c> { channel: channel.clone(), data: types::MessageEvent { sender: sender.clone(), - message: message::Message { + message: Message { id: row.id, body: row.body, }, @@ -72,7 +72,7 @@ impl<'c> Events<'c> { Ok(message) } - async fn assign_sequence(&mut self, channel: &channel::Id) -> Result<Sequence, sqlx::Error> { + async fn assign_sequence(&mut self, channel: &Channel) -> Result<Sequence, sqlx::Error> { let next = sqlx::query_scalar!( r#" update channel @@ -80,7 +80,7 @@ impl<'c> Events<'c> { where id = $1 returning last_sequence as "next_sequence: Sequence" "#, - channel, + channel.id, ) .fetch_one(&mut *self.0) .await?; @@ -88,18 +88,68 @@ impl<'c> Events<'c> { Ok(next) } - pub async fn expire(&mut self, expire_at: &DateTime) -> Result<(), sqlx::Error> { - sqlx::query!( + pub async fn delete_expired( + &mut self, + channel: &Channel, + message: &message::Id, + deleted_at: &DateTime, + ) -> Result<types::ChannelEvent, sqlx::Error> { + let sequence = self.assign_sequence(channel).await?; + + sqlx::query_scalar!( r#" delete from message + where id = $1 + returning 1 as "row: i64" + "#, + message, + ) + .fetch_one(&mut *self.0) + .await?; + + Ok(types::ChannelEvent { + sequence, + at: *deleted_at, + channel: channel.clone(), + data: types::MessageDeletedEvent { + message: message.clone(), + } + .into(), + }) + } + + pub async fn expired( + &mut self, + expire_at: &DateTime, + ) -> Result<Vec<(Channel, message::Id)>, sqlx::Error> { + let messages = sqlx::query!( + r#" + select + channel.id as "channel_id: channel::Id", + channel.name as "channel_name", + channel.created_at as "channel_created_at: DateTime", + message.id as "message: message::Id" + from message + join channel on message.channel = channel.id + join login as sender on message.sender = sender.id where sent_at < $1 "#, expire_at, ) - .execute(&mut *self.0) + .map(|row| { + ( + Channel { + id: row.channel_id, + name: row.channel_name, + created_at: row.channel_created_at, + }, + row.message, + ) + }) + .fetch_all(&mut *self.0) .await?; - Ok(()) + Ok(messages) } pub async fn replay( @@ -134,7 +184,7 @@ impl<'c> Events<'c> { id: row.sender_id, name: row.sender_name, }, - message: message::Message { + message: Message { id: row.id, body: row.body, }, diff --git a/src/events/routes.rs b/src/events/routes.rs index 3f70dcd..89c942c 100644 --- a/src/events/routes.rs +++ b/src/events/routes.rs @@ -13,7 +13,7 @@ use super::{ extract::LastEventId, types::{self, ResumePoint}, }; -use crate::{app::App, clock::RequestedAt, error::Internal, repo::login::Login}; +use crate::{app::App, error::Internal, repo::login::Login}; #[cfg(test)] mod test; @@ -24,7 +24,6 @@ pub fn router() -> Router<App> { async fn events( State(app): State<App>, - RequestedAt(subscribed_at): RequestedAt, _: Login, // requires auth, but doesn't actually care who you are last_event_id: Option<LastEventId<ResumePoint>>, ) -> Result<Events<impl Stream<Item = types::ResumableEvent> + std::fmt::Debug>, Internal> { @@ -32,7 +31,7 @@ async fn events( .map(LastEventId::into_inner) .unwrap_or_default(); - let stream = app.events().subscribe(&subscribed_at, resume_at).await?; + let stream = app.events().subscribe(resume_at).await?; Ok(Events(stream)) } diff --git a/src/events/routes/test.rs b/src/events/routes/test.rs index 55ada95..a6e2275 100644 --- a/src/events/routes/test.rs +++ b/src/events/routes/test.rs @@ -21,14 +21,14 @@ async fn includes_historical_message() { // Call the endpoint let subscriber = fixtures::login::create(&app).await; - let subscribed_at = fixtures::now(); - let routes::Events(mut events) = routes::events(State(app), subscribed_at, subscriber, None) + let routes::Events(events) = routes::events(State(app), subscriber, None) .await .expect("subscribe never fails"); // Verify the structure of the response. let types::ResumableEvent(_, event) = events + .filter(fixtures::filter::messages()) .next() .immediately() .await @@ -47,11 +47,9 @@ async fn includes_live_message() { // Call the endpoint let subscriber = fixtures::login::create(&app).await; - let subscribed_at = fixtures::now(); - let routes::Events(mut events) = - routes::events(State(app.clone()), subscribed_at, subscriber, None) - .await - .expect("subscribe never fails"); + let routes::Events(events) = routes::events(State(app.clone()), subscriber, None) + .await + .expect("subscribe never fails"); // Verify the semantics @@ -59,6 +57,7 @@ async fn includes_live_message() { let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await; let types::ResumableEvent(_, event) = events + .filter(fixtures::filter::messages()) .next() .immediately() .await @@ -92,14 +91,14 @@ async fn includes_multiple_channels() { // Call the endpoint let subscriber = fixtures::login::create(&app).await; - let subscribed_at = fixtures::now(); - let routes::Events(events) = routes::events(State(app), subscribed_at, subscriber, None) + let routes::Events(events) = routes::events(State(app), subscriber, None) .await .expect("subscribe never fails"); // Verify the structure of the response. let events = events + .filter(fixtures::filter::messages()) .take(messages.len()) .collect::<Vec<_>>() .immediately() @@ -129,8 +128,7 @@ async fn sequential_messages() { // Call the endpoint let subscriber = fixtures::login::create(&app).await; - let subscribed_at = fixtures::now(); - let routes::Events(events) = routes::events(State(app), subscribed_at, subscriber, None) + let routes::Events(events) = routes::events(State(app), subscriber, None) .await .expect("subscribe never fails"); @@ -169,17 +167,19 @@ async fn resumes_from() { // Call the endpoint let subscriber = fixtures::login::create(&app).await; - let subscribed_at = fixtures::now(); let resume_at = { // First subscription - let routes::Events(mut events) = - routes::events(State(app.clone()), subscribed_at, subscriber.clone(), None) - .await - .expect("subscribe never fails"); + let routes::Events(events) = routes::events(State(app.clone()), subscriber.clone(), None) + .await + .expect("subscribe never fails"); - let types::ResumableEvent(last_event_id, event) = - events.next().immediately().await.expect("delivered events"); + let types::ResumableEvent(last_event_id, event) = events + .filter(fixtures::filter::messages()) + .next() + .immediately() + .await + .expect("delivered events"); assert_eq!(initial_message, event); @@ -187,11 +187,9 @@ async fn resumes_from() { }; // Resume after disconnect - let reconnect_at = fixtures::now(); - let routes::Events(resumed) = - routes::events(State(app), reconnect_at, subscriber, Some(resume_at.into())) - .await - .expect("subscribe never fails"); + let routes::Events(resumed) = routes::events(State(app), subscriber, Some(resume_at.into())) + .await + .expect("subscribe never fails"); // Verify the structure of the response. @@ -243,13 +241,12 @@ async fn serial_resume() { ]; // First subscription - let subscribed_at = fixtures::now(); - let routes::Events(events) = - routes::events(State(app.clone()), subscribed_at, subscriber.clone(), None) - .await - .expect("subscribe never fails"); + let routes::Events(events) = routes::events(State(app.clone()), subscriber.clone(), None) + .await + .expect("subscribe never fails"); let events = events + .filter(fixtures::filter::messages()) .take(initial_messages.len()) .collect::<Vec<_>>() .immediately() @@ -277,10 +274,8 @@ async fn serial_resume() { ]; // Second subscription - let resubscribed_at = fixtures::now(); let routes::Events(events) = routes::events( State(app.clone()), - resubscribed_at, subscriber.clone(), Some(resume_at.into()), ) @@ -288,6 +283,7 @@ async fn serial_resume() { .expect("subscribe never fails"); let events = events + .filter(fixtures::filter::messages()) .take(resume_messages.len()) .collect::<Vec<_>>() .immediately() @@ -314,11 +310,9 @@ async fn serial_resume() { fixtures::message::send(&app, &sender, &channel_b, &fixtures::now()).await, ]; - // Second subscription - let resubscribed_at = fixtures::now(); + // Third subscription let routes::Events(events) = routes::events( State(app.clone()), - resubscribed_at, subscriber.clone(), Some(resume_at.into()), ) @@ -326,6 +320,7 @@ async fn serial_resume() { .expect("subscribe never fails"); let events = events + .filter(fixtures::filter::messages()) .take(final_messages.len()) .collect::<Vec<_>>() .immediately() @@ -340,33 +335,3 @@ async fn serial_resume() { } }; } - -#[tokio::test] -async fn removes_expired_messages() { - // Set up the environment - let app = fixtures::scratch_app().await; - let sender = fixtures::login::create(&app).await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; - - fixtures::message::send(&app, &sender, &channel, &fixtures::ancient()).await; - let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await; - - // Call the endpoint - - let subscriber = fixtures::login::create(&app).await; - let subscribed_at = fixtures::now(); - - let routes::Events(mut events) = routes::events(State(app), subscribed_at, subscriber, None) - .await - .expect("subscribe never fails"); - - // Verify the semantics - - let types::ResumableEvent(_, event) = events - .next() - .immediately() - .await - .expect("delivered messages"); - - assert_eq!(message, event); -} diff --git a/src/events/types.rs b/src/events/types.rs index 944321a..9a65207 100644 --- a/src/events/types.rs +++ b/src/events/types.rs @@ -119,6 +119,7 @@ impl ResumeElement for ChannelEvent { pub enum ChannelEventData { Created, Message(MessageEvent), + MessageDeleted(MessageDeletedEvent), } #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] @@ -132,3 +133,14 @@ impl From<MessageEvent> for ChannelEventData { Self::Message(message) } } + +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] +pub struct MessageDeletedEvent { + pub message: message::Id, +} + +impl From<MessageDeletedEvent> for ChannelEventData { + fn from(message: MessageDeletedEvent) -> Self { + Self::MessageDeleted(message) + } +} diff --git a/src/repo/channel.rs b/src/repo/channel.rs index e85b898..6514426 100644 --- a/src/repo/channel.rs +++ b/src/repo/channel.rs @@ -2,7 +2,7 @@ use std::fmt; use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; -use crate::{clock::DateTime, id::Id as BaseId}; +use crate::{clock::DateTime, events::types::Sequence, id::Id as BaseId}; pub trait Provider { fn channels(&mut self) -> Channels; @@ -31,13 +31,14 @@ impl<'c> Channels<'c> { created_at: &DateTime, ) -> Result<Channel, sqlx::Error> { let id = Id::generate(); + let sequence = Sequence::default(); let channel = sqlx::query_as!( Channel, r#" insert - into channel (id, name, created_at) - values ($1, $2, $3) + into channel (id, name, created_at, last_sequence) + values ($1, $2, $3, $4) returning id as "id: Id", name, @@ -46,6 +47,7 @@ impl<'c> Channels<'c> { id, name, created_at, + sequence, ) .fetch_one(&mut *self.0) .await?; diff --git a/src/test/fixtures/filter.rs b/src/test/fixtures/filter.rs new file mode 100644 index 0000000..8847e13 --- /dev/null +++ b/src/test/fixtures/filter.rs @@ -0,0 +1,9 @@ +use futures::future; + +use crate::events::types; + +pub fn messages() -> impl FnMut(&types::ResumableEvent) -> future::Ready<bool> { + |types::ResumableEvent(_, event)| { + future::ready(matches!(event.data, types::ChannelEventData::Message(_))) + } +} diff --git a/src/test/fixtures/mod.rs b/src/test/fixtures/mod.rs index 450fbec..d1dd0c3 100644 --- a/src/test/fixtures/mod.rs +++ b/src/test/fixtures/mod.rs @@ -3,6 +3,7 @@ use chrono::{TimeDelta, Utc}; use crate::{app::App, clock::RequestedAt, repo::pool}; pub mod channel; +pub mod filter; pub mod future; pub mod identity; pub mod login; |
