diff options
Diffstat (limited to 'src/events')
| -rw-r--r-- | src/events/app.rs | 40 | ||||
| -rw-r--r-- | src/events/expire.rs | 18 | ||||
| -rw-r--r-- | src/events/mod.rs | 1 | ||||
| -rw-r--r-- | src/events/repo/message.rs | 70 | ||||
| -rw-r--r-- | src/events/routes.rs | 5 | ||||
| -rw-r--r-- | src/events/routes/test.rs | 91 | ||||
| -rw-r--r-- | src/events/types.rs | 12 |
7 files changed, 146 insertions, 91 deletions
diff --git a/src/events/app.rs b/src/events/app.rs index 134e86a..03f3ee6 100644 --- a/src/events/app.rs +++ b/src/events/app.rs @@ -55,14 +55,35 @@ impl<'a> Events<'a> { Ok(event) } + pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { + // Somewhat arbitrarily, expire after 90 days. + let expire_at = relative_to.to_owned() - TimeDelta::days(90); + + let mut tx = self.db.begin().await?; + let expired = tx.message_events().expired(&expire_at).await?; + + let mut events = Vec::with_capacity(expired.len()); + for (channel, message) in expired { + let event = tx + .message_events() + .delete_expired(&channel, &message, relative_to) + .await?; + events.push(event); + } + + tx.commit().await?; + + for event in events { + self.broadcaster.broadcast(&event); + } + + Ok(()) + } + pub async fn subscribe( &self, - subscribed_at: &DateTime, resume_at: ResumePoint, ) -> Result<impl Stream<Item = types::ResumableEvent> + std::fmt::Debug, sqlx::Error> { - // Somewhat arbitrarily, expire after 90 days. - let expire_at = subscribed_at.to_owned() - TimeDelta::days(90); - let mut tx = self.db.begin().await?; let channels = tx.channels().all().await?; @@ -81,8 +102,6 @@ impl<'a> Events<'a> { // querying the DB. We'll prune out duplicates later. let live_messages = self.broadcaster.subscribe(); - tx.message_events().expire(&expire_at).await?; - let mut replays = BTreeMap::new(); let mut resume_live_at = resume_at.clone(); for channel in channels { @@ -107,9 +126,6 @@ impl<'a> Events<'a> { // * resume is redundant with the resume_at argument to // `tx.broadcasts().replay(…)`. let live_messages = live_messages - // Sure, it's temporally improbable that we'll ever skip a message - // that's 90 days old, but there's no reason not to be thorough. - .filter(Self::skip_expired(&expire_at)) // Filtering on the broadcast resume point filters out messages // before resume_at, and filters out messages duplicated from // stored_messages. @@ -134,12 +150,6 @@ impl<'a> Events<'a> { ) -> impl for<'m> FnMut(&'m types::ChannelEvent) -> future::Ready<bool> { move |event| future::ready(resume_at.not_after(event)) } - fn skip_expired( - expire_at: &DateTime, - ) -> impl for<'m> FnMut(&'m types::ChannelEvent) -> future::Ready<bool> { - let expire_at = expire_at.to_owned(); - move |event| future::ready(expire_at < event.at) - } } #[derive(Debug, thiserror::Error)] diff --git a/src/events/expire.rs b/src/events/expire.rs new file mode 100644 index 0000000..d92142d --- /dev/null +++ b/src/events/expire.rs @@ -0,0 +1,18 @@ +use axum::{ + extract::{Request, State}, + middleware::Next, + response::Response, +}; + +use crate::{app::App, clock::RequestedAt, error::Internal}; + +// Expires messages and channels before each request. +pub async fn middleware( + State(app): State<App>, + RequestedAt(expired_at): RequestedAt, + req: Request, + next: Next, +) -> Result<Response, Internal> { + app.events().expire(&expired_at).await?; + Ok(next.run(req).await) +} diff --git a/src/events/mod.rs b/src/events/mod.rs index 711ae64..86bc5e9 100644 --- a/src/events/mod.rs +++ b/src/events/mod.rs @@ -1,5 +1,6 @@ pub mod app; pub mod broadcaster; +pub mod expire; mod extract; pub mod repo; mod routes; diff --git a/src/events/repo/message.rs b/src/events/repo/message.rs index f6fce0e..32419d5 100644 --- a/src/events/repo/message.rs +++ b/src/events/repo/message.rs @@ -6,7 +6,7 @@ use crate::{ repo::{ channel::{self, Channel}, login::{self, Login}, - message, + message::{self, Message}, }, }; @@ -30,7 +30,7 @@ impl<'c> Events<'c> { body: &str, sent_at: &DateTime, ) -> Result<types::ChannelEvent, sqlx::Error> { - let sequence = self.assign_sequence(&channel.id).await?; + let sequence = self.assign_sequence(channel).await?; let id = message::Id::generate(); @@ -59,7 +59,7 @@ impl<'c> Events<'c> { channel: channel.clone(), data: types::MessageEvent { sender: sender.clone(), - message: message::Message { + message: Message { id: row.id, body: row.body, }, @@ -72,7 +72,7 @@ impl<'c> Events<'c> { Ok(message) } - async fn assign_sequence(&mut self, channel: &channel::Id) -> Result<Sequence, sqlx::Error> { + async fn assign_sequence(&mut self, channel: &Channel) -> Result<Sequence, sqlx::Error> { let next = sqlx::query_scalar!( r#" update channel @@ -80,7 +80,7 @@ impl<'c> Events<'c> { where id = $1 returning last_sequence as "next_sequence: Sequence" "#, - channel, + channel.id, ) .fetch_one(&mut *self.0) .await?; @@ -88,18 +88,68 @@ impl<'c> Events<'c> { Ok(next) } - pub async fn expire(&mut self, expire_at: &DateTime) -> Result<(), sqlx::Error> { - sqlx::query!( + pub async fn delete_expired( + &mut self, + channel: &Channel, + message: &message::Id, + deleted_at: &DateTime, + ) -> Result<types::ChannelEvent, sqlx::Error> { + let sequence = self.assign_sequence(channel).await?; + + sqlx::query_scalar!( r#" delete from message + where id = $1 + returning 1 as "row: i64" + "#, + message, + ) + .fetch_one(&mut *self.0) + .await?; + + Ok(types::ChannelEvent { + sequence, + at: *deleted_at, + channel: channel.clone(), + data: types::MessageDeletedEvent { + message: message.clone(), + } + .into(), + }) + } + + pub async fn expired( + &mut self, + expire_at: &DateTime, + ) -> Result<Vec<(Channel, message::Id)>, sqlx::Error> { + let messages = sqlx::query!( + r#" + select + channel.id as "channel_id: channel::Id", + channel.name as "channel_name", + channel.created_at as "channel_created_at: DateTime", + message.id as "message: message::Id" + from message + join channel on message.channel = channel.id + join login as sender on message.sender = sender.id where sent_at < $1 "#, expire_at, ) - .execute(&mut *self.0) + .map(|row| { + ( + Channel { + id: row.channel_id, + name: row.channel_name, + created_at: row.channel_created_at, + }, + row.message, + ) + }) + .fetch_all(&mut *self.0) .await?; - Ok(()) + Ok(messages) } pub async fn replay( @@ -134,7 +184,7 @@ impl<'c> Events<'c> { id: row.sender_id, name: row.sender_name, }, - message: message::Message { + message: Message { id: row.id, body: row.body, }, diff --git a/src/events/routes.rs b/src/events/routes.rs index 3f70dcd..89c942c 100644 --- a/src/events/routes.rs +++ b/src/events/routes.rs @@ -13,7 +13,7 @@ use super::{ extract::LastEventId, types::{self, ResumePoint}, }; -use crate::{app::App, clock::RequestedAt, error::Internal, repo::login::Login}; +use crate::{app::App, error::Internal, repo::login::Login}; #[cfg(test)] mod test; @@ -24,7 +24,6 @@ pub fn router() -> Router<App> { async fn events( State(app): State<App>, - RequestedAt(subscribed_at): RequestedAt, _: Login, // requires auth, but doesn't actually care who you are last_event_id: Option<LastEventId<ResumePoint>>, ) -> Result<Events<impl Stream<Item = types::ResumableEvent> + std::fmt::Debug>, Internal> { @@ -32,7 +31,7 @@ async fn events( .map(LastEventId::into_inner) .unwrap_or_default(); - let stream = app.events().subscribe(&subscribed_at, resume_at).await?; + let stream = app.events().subscribe(resume_at).await?; Ok(Events(stream)) } diff --git a/src/events/routes/test.rs b/src/events/routes/test.rs index 55ada95..a6e2275 100644 --- a/src/events/routes/test.rs +++ b/src/events/routes/test.rs @@ -21,14 +21,14 @@ async fn includes_historical_message() { // Call the endpoint let subscriber = fixtures::login::create(&app).await; - let subscribed_at = fixtures::now(); - let routes::Events(mut events) = routes::events(State(app), subscribed_at, subscriber, None) + let routes::Events(events) = routes::events(State(app), subscriber, None) .await .expect("subscribe never fails"); // Verify the structure of the response. let types::ResumableEvent(_, event) = events + .filter(fixtures::filter::messages()) .next() .immediately() .await @@ -47,11 +47,9 @@ async fn includes_live_message() { // Call the endpoint let subscriber = fixtures::login::create(&app).await; - let subscribed_at = fixtures::now(); - let routes::Events(mut events) = - routes::events(State(app.clone()), subscribed_at, subscriber, None) - .await - .expect("subscribe never fails"); + let routes::Events(events) = routes::events(State(app.clone()), subscriber, None) + .await + .expect("subscribe never fails"); // Verify the semantics @@ -59,6 +57,7 @@ async fn includes_live_message() { let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await; let types::ResumableEvent(_, event) = events + .filter(fixtures::filter::messages()) .next() .immediately() .await @@ -92,14 +91,14 @@ async fn includes_multiple_channels() { // Call the endpoint let subscriber = fixtures::login::create(&app).await; - let subscribed_at = fixtures::now(); - let routes::Events(events) = routes::events(State(app), subscribed_at, subscriber, None) + let routes::Events(events) = routes::events(State(app), subscriber, None) .await .expect("subscribe never fails"); // Verify the structure of the response. let events = events + .filter(fixtures::filter::messages()) .take(messages.len()) .collect::<Vec<_>>() .immediately() @@ -129,8 +128,7 @@ async fn sequential_messages() { // Call the endpoint let subscriber = fixtures::login::create(&app).await; - let subscribed_at = fixtures::now(); - let routes::Events(events) = routes::events(State(app), subscribed_at, subscriber, None) + let routes::Events(events) = routes::events(State(app), subscriber, None) .await .expect("subscribe never fails"); @@ -169,17 +167,19 @@ async fn resumes_from() { // Call the endpoint let subscriber = fixtures::login::create(&app).await; - let subscribed_at = fixtures::now(); let resume_at = { // First subscription - let routes::Events(mut events) = - routes::events(State(app.clone()), subscribed_at, subscriber.clone(), None) - .await - .expect("subscribe never fails"); + let routes::Events(events) = routes::events(State(app.clone()), subscriber.clone(), None) + .await + .expect("subscribe never fails"); - let types::ResumableEvent(last_event_id, event) = - events.next().immediately().await.expect("delivered events"); + let types::ResumableEvent(last_event_id, event) = events + .filter(fixtures::filter::messages()) + .next() + .immediately() + .await + .expect("delivered events"); assert_eq!(initial_message, event); @@ -187,11 +187,9 @@ async fn resumes_from() { }; // Resume after disconnect - let reconnect_at = fixtures::now(); - let routes::Events(resumed) = - routes::events(State(app), reconnect_at, subscriber, Some(resume_at.into())) - .await - .expect("subscribe never fails"); + let routes::Events(resumed) = routes::events(State(app), subscriber, Some(resume_at.into())) + .await + .expect("subscribe never fails"); // Verify the structure of the response. @@ -243,13 +241,12 @@ async fn serial_resume() { ]; // First subscription - let subscribed_at = fixtures::now(); - let routes::Events(events) = - routes::events(State(app.clone()), subscribed_at, subscriber.clone(), None) - .await - .expect("subscribe never fails"); + let routes::Events(events) = routes::events(State(app.clone()), subscriber.clone(), None) + .await + .expect("subscribe never fails"); let events = events + .filter(fixtures::filter::messages()) .take(initial_messages.len()) .collect::<Vec<_>>() .immediately() @@ -277,10 +274,8 @@ async fn serial_resume() { ]; // Second subscription - let resubscribed_at = fixtures::now(); let routes::Events(events) = routes::events( State(app.clone()), - resubscribed_at, subscriber.clone(), Some(resume_at.into()), ) @@ -288,6 +283,7 @@ async fn serial_resume() { .expect("subscribe never fails"); let events = events + .filter(fixtures::filter::messages()) .take(resume_messages.len()) .collect::<Vec<_>>() .immediately() @@ -314,11 +310,9 @@ async fn serial_resume() { fixtures::message::send(&app, &sender, &channel_b, &fixtures::now()).await, ]; - // Second subscription - let resubscribed_at = fixtures::now(); + // Third subscription let routes::Events(events) = routes::events( State(app.clone()), - resubscribed_at, subscriber.clone(), Some(resume_at.into()), ) @@ -326,6 +320,7 @@ async fn serial_resume() { .expect("subscribe never fails"); let events = events + .filter(fixtures::filter::messages()) .take(final_messages.len()) .collect::<Vec<_>>() .immediately() @@ -340,33 +335,3 @@ async fn serial_resume() { } }; } - -#[tokio::test] -async fn removes_expired_messages() { - // Set up the environment - let app = fixtures::scratch_app().await; - let sender = fixtures::login::create(&app).await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; - - fixtures::message::send(&app, &sender, &channel, &fixtures::ancient()).await; - let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await; - - // Call the endpoint - - let subscriber = fixtures::login::create(&app).await; - let subscribed_at = fixtures::now(); - - let routes::Events(mut events) = routes::events(State(app), subscribed_at, subscriber, None) - .await - .expect("subscribe never fails"); - - // Verify the semantics - - let types::ResumableEvent(_, event) = events - .next() - .immediately() - .await - .expect("delivered messages"); - - assert_eq!(message, event); -} diff --git a/src/events/types.rs b/src/events/types.rs index 944321a..9a65207 100644 --- a/src/events/types.rs +++ b/src/events/types.rs @@ -119,6 +119,7 @@ impl ResumeElement for ChannelEvent { pub enum ChannelEventData { Created, Message(MessageEvent), + MessageDeleted(MessageDeletedEvent), } #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] @@ -132,3 +133,14 @@ impl From<MessageEvent> for ChannelEventData { Self::Message(message) } } + +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] +pub struct MessageDeletedEvent { + pub message: message::Id, +} + +impl From<MessageDeletedEvent> for ChannelEventData { + fn from(message: MessageDeletedEvent) -> Self { + Self::MessageDeleted(message) + } +} |
