diff options
| author | Owen Jacobson <owen@grimoire.ca> | 2024-10-18 23:42:08 -0400 |
|---|---|---|
| committer | Owen Jacobson <owen@grimoire.ca> | 2024-10-18 23:42:08 -0400 |
| commit | 82338ddcb7f14ffbd584a954689f02b6e6a7988e (patch) | |
| tree | 1e0a525766ca45067bb122cad3af69437db504ca /src | |
| parent | bde5aea211e9838b4511a2b57c6a256fe89b66ab (diff) | |
| parent | 17b62b3458e3a992b93cd485b05d3fb112dd349a (diff) | |
Merge branch 'wip/retain-deleted'
Diffstat (limited to 'src')
| -rw-r--r-- | src/channel/app.rs | 24 | ||||
| -rw-r--r-- | src/channel/history.rs | 5 | ||||
| -rw-r--r-- | src/channel/repo.rs | 179 | ||||
| -rw-r--r-- | src/channel/routes/channel/delete.rs | 2 | ||||
| -rw-r--r-- | src/channel/routes/channel/test.rs | 6 | ||||
| -rw-r--r-- | src/channel/routes/test.rs | 106 | ||||
| -rw-r--r-- | src/channel/snapshot.rs | 3 | ||||
| -rw-r--r-- | src/event/repo.rs | 5 | ||||
| -rw-r--r-- | src/event/routes/test.rs | 33 | ||||
| -rw-r--r-- | src/event/sequence.rs | 11 | ||||
| -rw-r--r-- | src/expire.rs | 2 | ||||
| -rw-r--r-- | src/login/repo.rs | 15 | ||||
| -rw-r--r-- | src/message/app.rs | 24 | ||||
| -rw-r--r-- | src/message/history.rs | 5 | ||||
| -rw-r--r-- | src/message/repo.rs | 214 | ||||
| -rw-r--r-- | src/message/routes/message.rs | 6 | ||||
| -rw-r--r-- | src/message/snapshot.rs | 4 | ||||
| -rw-r--r-- | src/test/fixtures/channel.rs | 23 | ||||
| -rw-r--r-- | src/test/fixtures/event.rs | 7 | ||||
| -rw-r--r-- | src/test/fixtures/filter.rs | 11 | ||||
| -rw-r--r-- | src/test/fixtures/message.rs | 18 | ||||
| -rw-r--r-- | src/test/fixtures/mod.rs | 1 | ||||
| -rw-r--r-- | src/token/repo/auth.rs | 5 |
23 files changed, 493 insertions, 216 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs index 46eaba8..75c662d 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -43,7 +43,7 @@ impl<'a> Channels<'a> { let channel = tx.channels().by_id(channel).await.optional()?; tx.commit().await?; - Ok(channel.iter().flat_map(History::events).collect()) + Ok(channel.as_ref().and_then(History::as_snapshot)) } pub async fn delete(&self, channel: &Id, deleted_at: &DateTime) -> Result<(), Error> { @@ -54,13 +54,16 @@ impl<'a> Channels<'a> { .by_id(channel) .await .not_found(|| Error::NotFound(channel.clone()))?; + channel + .as_snapshot() + .ok_or_else(|| Error::Deleted(channel.id().clone()))?; let mut events = Vec::new(); - let messages = tx.messages().in_channel(&channel, None).await?; + let messages = tx.messages().live(&channel).await?; for message in messages { let deleted = tx.sequence().next(deleted_at).await?; - let message = tx.messages().delete(message.id(), &deleted).await?; + let message = tx.messages().delete(&message, &deleted).await?; events.extend( message .events() @@ -70,7 +73,7 @@ impl<'a> Channels<'a> { } let deleted = tx.sequence().next(deleted_at).await?; - let channel = tx.channels().delete(channel.id(), &deleted).await?; + let channel = tx.channels().delete(&channel, &deleted).await?; events.extend( channel .events() @@ -115,6 +118,17 @@ impl<'a> Channels<'a> { Ok(()) } + + pub async fn purge(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { + // Somewhat arbitrarily, purge after 7 days. + let purge_at = relative_to.to_owned() - TimeDelta::days(7); + + let mut tx = self.db.begin().await?; + tx.channels().purge(&purge_at).await?; + tx.commit().await?; + + Ok(()) + } } #[derive(Debug, thiserror::Error)] @@ -129,6 +143,8 @@ pub enum CreateError { pub enum Error { #[error("channel {0} not found")] NotFound(Id), + #[error("channel {0} deleted")] + Deleted(Id), #[error(transparent)] Database(#[from] sqlx::Error), } diff --git a/src/channel/history.rs b/src/channel/history.rs index 78b3437..4b9fcc7 100644 --- a/src/channel/history.rs +++ b/src/channel/history.rs @@ -31,6 +31,11 @@ impl History { .filter(Sequence::up_to(resume_point.into())) .collect() } + + // Snapshot of this channel as of all events recorded in this history. + pub fn as_snapshot(&self) -> Option<Channel> { + self.events().collect() + } } // Event factories diff --git a/src/channel/repo.rs b/src/channel/repo.rs index 2f57581..1cd1c80 100644 --- a/src/channel/repo.rs +++ b/src/channel/repo.rs @@ -28,7 +28,7 @@ impl<'c> Channels<'c> { values ($1, $2, $3, $4) returning id as "id: Id", - name, + name as "name!", -- known non-null as we just set it created_at as "created_at: DateTime", created_sequence as "created_sequence: Sequence" "#, @@ -41,11 +41,9 @@ impl<'c> Channels<'c> { channel: Channel { id: row.id, name: row.name, + deleted_at: None, }, - created: Instant { - at: row.created_at, - sequence: row.created_sequence, - }, + created: Instant::new(row.created_at, row.created_sequence), deleted: None, }) .fetch_one(&mut *self.0) @@ -59,10 +57,14 @@ impl<'c> Channels<'c> { r#" select id as "id: Id", - name, - created_at as "created_at: DateTime", - created_sequence as "created_sequence: Sequence" + channel.name, + channel.created_at as "created_at: DateTime", + channel.created_sequence as "created_sequence: Sequence", + deleted.deleted_at as "deleted_at?: DateTime", + deleted.deleted_sequence as "deleted_sequence?: Sequence" from channel + left join channel_deleted as deleted + using (id) where id = $1 "#, channel, @@ -70,13 +72,11 @@ impl<'c> Channels<'c> { .map(|row| History { channel: Channel { id: row.id, - name: row.name, + name: row.name.unwrap_or_default(), + deleted_at: row.deleted_at, }, - created: Instant { - at: row.created_at, - sequence: row.created_sequence, - }, - deleted: None, + created: Instant::new(row.created_at, row.created_sequence), + deleted: Instant::optional(row.deleted_at, row.deleted_sequence), }) .fetch_one(&mut *self.0) .await?; @@ -89,11 +89,15 @@ impl<'c> Channels<'c> { r#" select id as "id: Id", - name, - created_at as "created_at: DateTime", - created_sequence as "created_sequence: Sequence" + channel.name, + channel.created_at as "created_at: DateTime", + channel.created_sequence as "created_sequence: Sequence", + deleted.deleted_at as "deleted_at: DateTime", + deleted.deleted_sequence as "deleted_sequence: Sequence" from channel - where coalesce(created_sequence <= $1, true) + left join channel_deleted as deleted + using (id) + where coalesce(channel.created_sequence <= $1, true) order by channel.name "#, resume_at, @@ -101,13 +105,11 @@ impl<'c> Channels<'c> { .map(|row| History { channel: Channel { id: row.id, - name: row.name, + name: row.name.unwrap_or_default(), + deleted_at: row.deleted_at, }, - created: Instant { - at: row.created_at, - sequence: row.created_sequence, - }, - deleted: None, + created: Instant::new(row.created_at, row.created_sequence), + deleted: Instant::optional(row.deleted_at, row.deleted_sequence), }) .fetch_all(&mut *self.0) .await?; @@ -123,24 +125,26 @@ impl<'c> Channels<'c> { r#" select id as "id: Id", - name, - created_at as "created_at: DateTime", - created_sequence as "created_sequence: Sequence" + channel.name, + channel.created_at as "created_at: DateTime", + channel.created_sequence as "created_sequence: Sequence", + deleted.deleted_at as "deleted_at: DateTime", + deleted.deleted_sequence as "deleted_sequence: Sequence" from channel - where coalesce(created_sequence > $1, true) + left join channel_deleted as deleted + using (id) + where coalesce(channel.created_sequence > $1, true) "#, resume_at, ) .map(|row| History { channel: Channel { id: row.id, - name: row.name, + name: row.name.unwrap_or_default(), + deleted_at: row.deleted_at, }, - created: Instant { - at: row.created_at, - sequence: row.created_sequence, - }, - deleted: None, + created: Instant::new(row.created_at, row.created_sequence), + deleted: Instant::optional(row.deleted_at, row.deleted_sequence), }) .fetch_all(&mut *self.0) .await?; @@ -150,50 +154,109 @@ impl<'c> Channels<'c> { pub async fn delete( &mut self, - channel: &Id, + channel: &History, deleted: &Instant, ) -> Result<History, sqlx::Error> { - let channel = sqlx::query!( + let id = channel.id(); + sqlx::query_scalar!( r#" - delete from channel + insert into channel_deleted (id, deleted_at, deleted_sequence) + values ($1, $2, $3) + returning 1 as "deleted: bool" + "#, + id, + deleted.at, + deleted.sequence, + ) + .fetch_one(&mut *self.0) + .await?; + + // Small social responsibility hack here: when a channel is deleted, its name is + // retconned to have been the empty string. Someone reading the event stream + // afterwards, or looking at channels via the API, cannot retrieve the + // "deleted" channel's information by ignoring the deletion event. + // + // This also avoids the need for a separate name reservation table to ensure that live channels have unique names, since the `channel` table's name field is unique over non-null values. + sqlx::query_scalar!( + r#" + update channel + set name = null where id = $1 - returning - id as "id: Id", - name, - created_at as "created_at: DateTime", - created_sequence as "created_sequence: Sequence" + returning 1 as "updated: bool" "#, - channel, + id, ) - .map(|row| History { - channel: Channel { - id: row.id, - name: row.name, - }, - created: Instant { - at: row.created_at, - sequence: row.created_sequence, - }, - deleted: Some(*deleted), - }) .fetch_one(&mut *self.0) .await?; + let channel = self.by_id(id).await?; + Ok(channel) } - pub async fn expired(&mut self, expired_at: &DateTime) -> Result<Vec<Id>, sqlx::Error> { + pub async fn purge(&mut self, purge_at: &DateTime) -> Result<(), sqlx::Error> { let channels = sqlx::query_scalar!( r#" + with has_messages as ( + select channel + from message + group by channel + ) + delete from channel_deleted + where deleted_at < $1 + and id not in has_messages + returning id as "id: Id" + "#, + purge_at, + ) + .fetch_all(&mut *self.0) + .await?; + + for channel in channels { + // Wanted: a way to batch these up into one query. + sqlx::query!( + r#" + delete from channel + where id = $1 + "#, + channel, + ) + .execute(&mut *self.0) + .await?; + } + + Ok(()) + } + + pub async fn expired(&mut self, expired_at: &DateTime) -> Result<Vec<History>, sqlx::Error> { + let channels = sqlx::query!( + r#" select - channel.id as "id: Id" + channel.id as "id: Id", + channel.name, + channel.created_at as "created_at: DateTime", + channel.created_sequence as "created_sequence: Sequence", + deleted.deleted_at as "deleted_at?: DateTime", + deleted.deleted_sequence as "deleted_sequence?: Sequence" from channel - left join message - where created_at < $1 + left join channel_deleted as deleted + using (id) + left join message + where channel.created_at < $1 and message.id is null + and deleted.id is null "#, expired_at, ) + .map(|row| History { + channel: Channel { + id: row.id, + name: row.name.unwrap_or_default(), + deleted_at: row.deleted_at, + }, + created: Instant::new(row.created_at, row.created_sequence), + deleted: Instant::optional(row.deleted_at, row.deleted_sequence), + }) .fetch_all(&mut *self.0) .await?; diff --git a/src/channel/routes/channel/delete.rs b/src/channel/routes/channel/delete.rs index efac0c0..5f40ddf 100644 --- a/src/channel/routes/channel/delete.rs +++ b/src/channel/routes/channel/delete.rs @@ -32,7 +32,7 @@ impl IntoResponse for Error { let Self(error) = self; #[allow(clippy::match_wildcard_for_single_variants)] match error { - app::Error::NotFound(_) => NotFound(error).into_response(), + app::Error::NotFound(_) | app::Error::Deleted(_) => NotFound(error).into_response(), other => Internal::from(other).into_response(), } } diff --git a/src/channel/routes/channel/test.rs b/src/channel/routes/channel/test.rs index bc02b20..c6541cd 100644 --- a/src/channel/routes/channel/test.rs +++ b/src/channel/routes/channel/test.rs @@ -4,7 +4,7 @@ use futures::stream::StreamExt; use super::post; use crate::{ channel, - event::{self, Sequenced}, + event::Sequenced, message::{self, app::SendError}, test::fixtures::{self, future::Immediately as _}, }; @@ -45,7 +45,7 @@ async fn messages_in_order() { .subscribe(None) .await .expect("subscribing to a valid channel") - .filter(fixtures::filter::messages()) + .filter_map(fixtures::message::events) .take(requests.len()); let events = events.collect::<Vec<_>>().immediately().await; @@ -54,7 +54,7 @@ async fn messages_in_order() { assert_eq!(*sent_at, event.at()); assert!(matches!( event, - event::Event::Message(message::Event::Sent(event)) + message::Event::Sent(event) if event.message.sender == sender.id && event.message.body == message )); diff --git a/src/channel/routes/test.rs b/src/channel/routes/test.rs index 81f1465..ffd8484 100644 --- a/src/channel/routes/test.rs +++ b/src/channel/routes/test.rs @@ -1,10 +1,11 @@ +use std::future; + use axum::extract::{Json, State}; use futures::stream::StreamExt as _; use super::post; use crate::{ - channel::{self, app}, - event, + channel::app, test::fixtures::{self, future::Immediately as _}, }; @@ -19,29 +20,35 @@ async fn new_channel() { let name = fixtures::channel::propose(); let request = post::Request { name: name.clone() }; - let Json(response_channel) = - post::handler(State(app.clone()), creator, fixtures::now(), Json(request)) - .await - .expect("new channel in an empty app"); + let Json(response) = post::handler(State(app.clone()), creator, fixtures::now(), Json(request)) + .await + .expect("creating a channel in an empty app succeeds"); // Verify the structure of the response - assert_eq!(name, response_channel.name); + assert_eq!(name, response.name); // Verify the semantics let snapshot = app.boot().snapshot().await.expect("boot always succeeds"); - assert!(snapshot - .channels - .iter() - .any(|channel| channel.name == response_channel.name && channel.id == response_channel.id)); + assert!(snapshot.channels.iter().any(|channel| channel == &response)); + + let channel = app + .channels() + .get(&response.id) + .await + .expect("searching for channels by ID never fails") + .expect("the newly-created channel exists"); + assert_eq!(response, channel); let mut events = app .events() .subscribe(None) .await .expect("subscribing never fails") - .filter(fixtures::filter::created()); + .filter_map(fixtures::channel::events) + .filter_map(fixtures::channel::created) + .filter(|event| future::ready(event.channel == response)); let event = events .next() @@ -49,11 +56,7 @@ async fn new_channel() { .await .expect("creation event published"); - assert!(matches!( - event, - event::Event::Channel(channel::Event::Created(event)) - if event.channel == response_channel - )); + assert_eq!(event.channel, response); } #[tokio::test] @@ -81,3 +84,72 @@ async fn duplicate_name() { app::CreateError::DuplicateName(name) if channel.name == name )); } + +#[tokio::test] +async fn name_reusable_after_delete() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let creator = fixtures::login::create(&app, &fixtures::now()).await; + let name = fixtures::channel::propose(); + + // Call the endpoint (first time) + + let request = post::Request { name: name.clone() }; + let Json(response) = post::handler( + State(app.clone()), + creator.clone(), + fixtures::now(), + Json(request), + ) + .await + .expect("new channel in an empty app"); + + // Delete the channel + + app.channels() + .delete(&response.id, &fixtures::now()) + .await + .expect("deleting a newly-created channel succeeds"); + + // Call the endpoint (second time) + + let request = post::Request { name: name.clone() }; + let Json(response) = post::handler(State(app.clone()), creator, fixtures::now(), Json(request)) + .await + .expect("new channel in an empty app"); + + // Verify the structure of the response + + assert_eq!(name, response.name); + + // Verify the semantics + + let snapshot = app.boot().snapshot().await.expect("boot always succeeds"); + assert!(snapshot.channels.iter().any(|channel| channel == &response)); + + let channel = app + .channels() + .get(&response.id) + .await + .expect("searching for channels by ID never fails") + .expect("the newly-created channel exists"); + assert_eq!(response, channel); + + let mut events = app + .events() + .subscribe(None) + .await + .expect("subscribing never fails") + .filter_map(fixtures::channel::events) + .filter_map(fixtures::channel::created) + .filter(|event| future::ready(event.channel == response)); + + let event = events + .next() + .immediately() + .await + .expect("creation event published"); + + assert_eq!(event.channel, response); +} diff --git a/src/channel/snapshot.rs b/src/channel/snapshot.rs index d4d1d27..2b7d89a 100644 --- a/src/channel/snapshot.rs +++ b/src/channel/snapshot.rs @@ -2,11 +2,14 @@ use super::{ event::{Created, Event}, Id, }; +use crate::clock::DateTime; #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] pub struct Channel { pub id: Id, pub name: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub deleted_at: Option<DateTime>, } impl Channel { diff --git a/src/event/repo.rs b/src/event/repo.rs index 40d6a53..56beeea 100644 --- a/src/event/repo.rs +++ b/src/event/repo.rs @@ -29,10 +29,7 @@ impl<'c> Sequences<'c> { .fetch_one(&mut *self.0) .await?; - Ok(Instant { - at: *at, - sequence: next, - }) + Ok(Instant::new(*at, next)) } pub async fn current(&mut self) -> Result<Sequence, sqlx::Error> { diff --git a/src/event/routes/test.rs b/src/event/routes/test.rs index 249f5c2..e6a8b9d 100644 --- a/src/event/routes/test.rs +++ b/src/event/routes/test.rs @@ -31,7 +31,7 @@ async fn includes_historical_message() { // Verify the structure of the response. let event = events - .filter(fixtures::filter::messages()) + .filter_map(fixtures::message::events) .next() .immediately() .await @@ -62,7 +62,7 @@ async fn includes_live_message() { let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; let event = events - .filter(fixtures::filter::messages()) + .filter_map(fixtures::message::events) .next() .immediately() .await @@ -104,7 +104,7 @@ async fn includes_multiple_channels() { // Verify the structure of the response. let events = events - .filter(fixtures::filter::messages()) + .filter_map(fixtures::message::events) .take(messages.len()) .collect::<Vec<_>>() .immediately() @@ -141,13 +141,15 @@ async fn sequential_messages() { // Verify the structure of the response. - let mut events = events.filter(|event| { - future::ready( - messages - .iter() - .any(|message| fixtures::event::message_sent(event, message)), - ) - }); + let mut events = events + .filter_map(fixtures::message::events) + .filter(|event| { + future::ready( + messages + .iter() + .any(|message| fixtures::event::message_sent(event, message)), + ) + }); // Verify delivery in order for message in &messages { @@ -193,7 +195,7 @@ async fn resumes_from() { .expect("subscribe never fails"); let event = events - .filter(fixtures::filter::messages()) + .filter_map(fixtures::message::events) .next() .immediately() .await @@ -217,6 +219,7 @@ async fn resumes_from() { // Verify the structure of the response. let events = resumed + .filter_map(fixtures::message::events) .take(later_messages.len()) .collect::<Vec<_>>() .immediately() @@ -275,7 +278,7 @@ async fn serial_resume() { .expect("subscribe never fails"); let events = events - .filter(fixtures::filter::messages()) + .filter_map(fixtures::message::events) .take(initial_messages.len()) .collect::<Vec<_>>() .immediately() @@ -313,7 +316,7 @@ async fn serial_resume() { .expect("subscribe never fails"); let events = events - .filter(fixtures::filter::messages()) + .filter_map(fixtures::message::events) .take(resume_messages.len()) .collect::<Vec<_>>() .immediately() @@ -351,7 +354,7 @@ async fn serial_resume() { .expect("subscribe never fails"); let events = events - .filter(fixtures::filter::messages()) + .filter_map(fixtures::message::events) .take(final_messages.len()) .collect::<Vec<_>>() .immediately() @@ -401,6 +404,7 @@ async fn terminates_on_token_expiry() { ]; assert!(events + .filter_map(fixtures::message::events) .filter(|event| future::ready( messages .iter() @@ -452,6 +456,7 @@ async fn terminates_on_logout() { ]; assert!(events + .filter_map(fixtures::message::events) .filter(|event| future::ready( messages .iter() diff --git a/src/event/sequence.rs b/src/event/sequence.rs index bf6d5b8..9bc399b 100644 --- a/src/event/sequence.rs +++ b/src/event/sequence.rs @@ -10,6 +10,17 @@ pub struct Instant { pub sequence: Sequence, } +impl Instant { + pub fn new(at: DateTime, sequence: Sequence) -> Self { + Self { at, sequence } + } + + pub fn optional(at: Option<DateTime>, sequence: Option<Sequence>) -> Option<Self> { + at.zip(sequence) + .map(|(at, sequence)| Self::new(at, sequence)) + } +} + impl From<Instant> for Sequence { fn from(instant: Instant) -> Self { instant.sequence diff --git a/src/expire.rs b/src/expire.rs index eaedc44..1427a8d 100644 --- a/src/expire.rs +++ b/src/expire.rs @@ -16,6 +16,8 @@ pub async fn middleware( app.tokens().expire(&expired_at).await?; app.invites().expire(&expired_at).await?; app.messages().expire(&expired_at).await?; + app.messages().purge(&expired_at).await?; app.channels().expire(&expired_at).await?; + app.channels().purge(&expired_at).await?; Ok(next.run(req).await) } diff --git a/src/login/repo.rs b/src/login/repo.rs index 6d6510c..7d0fcb1 100644 --- a/src/login/repo.rs +++ b/src/login/repo.rs @@ -49,10 +49,7 @@ impl<'c> Logins<'c> { id: row.id, name: row.name, }, - created: Instant { - at: row.created_at, - sequence: row.created_sequence, - }, + created: Instant::new(row.created_at, row.created_sequence), }) .fetch_one(&mut *self.0) .await?; @@ -79,10 +76,7 @@ impl<'c> Logins<'c> { id: row.id, name: row.name, }, - created: Instant { - at: row.created_at, - sequence: row.created_sequence, - }, + created: Instant::new(row.created_at, row.created_sequence), }) .fetch_all(&mut *self.0) .await?; @@ -107,10 +101,7 @@ impl<'c> Logins<'c> { id: row.id, name: row.name, }, - created: Instant { - at: row.created_at, - sequence: row.created_sequence, - }, + created: Instant::new(row.created_at, row.created_sequence), }) .fetch_all(&mut *self.0) .await?; diff --git a/src/message/app.rs b/src/message/app.rs index c1bcde6..4e50513 100644 --- a/src/message/app.rs +++ b/src/message/app.rs @@ -46,8 +46,17 @@ impl<'a> Messages<'a> { pub async fn delete(&self, message: &Id, deleted_at: &DateTime) -> Result<(), DeleteError> { let mut tx = self.db.begin().await?; + let message = tx + .messages() + .by_id(message) + .await + .not_found(|| DeleteError::NotFound(message.clone()))?; + message + .as_snapshot() + .ok_or_else(|| DeleteError::Deleted(message.id().clone()))?; + let deleted = tx.sequence().next(deleted_at).await?; - let message = tx.messages().delete(message, &deleted).await?; + let message = tx.messages().delete(&message, &deleted).await?; tx.commit().await?; self.events.broadcast( @@ -91,6 +100,17 @@ impl<'a> Messages<'a> { Ok(()) } + + pub async fn purge(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { + // Somewhat arbitrarily, purge after 6 hours. + let purge_at = relative_to.to_owned() - TimeDelta::hours(6); + + let mut tx = self.db.begin().await?; + tx.messages().purge(&purge_at).await?; + tx.commit().await?; + + Ok(()) + } } #[derive(Debug, thiserror::Error)] @@ -107,6 +127,8 @@ pub enum DeleteError { ChannelNotFound(channel::Id), #[error("message {0} not found")] NotFound(Id), + #[error("message {0} deleted")] + Deleted(Id), #[error(transparent)] Database(#[from] sqlx::Error), } diff --git a/src/message/history.rs b/src/message/history.rs index 09e69b7..0424d0d 100644 --- a/src/message/history.rs +++ b/src/message/history.rs @@ -30,6 +30,11 @@ impl History { .filter(Sequence::up_to(resume_point.into())) .collect() } + + // Snapshot of this message as of all events recorded in this history. + pub fn as_snapshot(&self) -> Option<Message> { + self.events().collect() + } } // Events interface diff --git a/src/message/repo.rs b/src/message/repo.rs index 71c6d10..85a69fc 100644 --- a/src/message/repo.rs +++ b/src/message/repo.rs @@ -53,14 +53,12 @@ impl<'c> Messages<'c> { ) .map(|row| History { message: Message { - sent: Instant { - at: row.sent_at, - sequence: row.sent_sequence, - }, + sent: Instant::new(row.sent_at, row.sent_sequence), channel: row.channel, sender: row.sender, id: row.id, - body: row.body, + body: row.body.unwrap_or_default(), + deleted_at: None, }, deleted: None, }) @@ -70,41 +68,37 @@ impl<'c> Messages<'c> { Ok(message) } - pub async fn in_channel( - &mut self, - channel: &channel::History, - resume_at: ResumePoint, - ) -> Result<Vec<History>, sqlx::Error> { + pub async fn live(&mut self, channel: &channel::History) -> Result<Vec<History>, sqlx::Error> { let channel_id = channel.id(); let messages = sqlx::query!( r#" select - channel as "channel: channel::Id", - sender as "sender: login::Id", + message.channel as "channel: channel::Id", + message.sender as "sender: login::Id", id as "id: Id", - body, - sent_at as "sent_at: DateTime", - sent_sequence as "sent_sequence: Sequence" + message.body, + message.sent_at as "sent_at: DateTime", + message.sent_sequence as "sent_sequence: Sequence", + deleted.deleted_at as "deleted_at: DateTime", + deleted.deleted_sequence as "deleted_sequence: Sequence" from message - where channel = $1 - and coalesce(sent_sequence <= $2, true) - order by sent_sequence + left join message_deleted as deleted + using (id) + where message.channel = $1 + and deleted.id is null "#, channel_id, - resume_at, ) .map(|row| History { message: Message { - sent: Instant { - at: row.sent_at, - sequence: row.sent_sequence, - }, + sent: Instant::new(row.sent_at, row.sent_sequence), channel: row.channel, sender: row.sender, id: row.id, - body: row.body, + body: row.body.unwrap_or_default(), + deleted_at: row.deleted_at, }, - deleted: None, + deleted: Instant::optional(row.deleted_at, row.deleted_sequence), }) .fetch_all(&mut *self.0) .await?; @@ -116,30 +110,32 @@ impl<'c> Messages<'c> { let messages = sqlx::query!( r#" select - channel as "channel: channel::Id", - sender as "sender: login::Id", + message.channel as "channel: channel::Id", + message.sender as "sender: login::Id", id as "id: Id", - body, - sent_at as "sent_at: DateTime", - sent_sequence as "sent_sequence: Sequence" + message.body, + message.sent_at as "sent_at: DateTime", + message.sent_sequence as "sent_sequence: Sequence", + deleted.deleted_at as "deleted_at: DateTime", + deleted.deleted_sequence as "deleted_sequence: Sequence" from message - where coalesce(sent_sequence <= $2, true) - order by sent_sequence + left join message_deleted as deleted + using (id) + where coalesce(message.sent_sequence <= $2, true) + order by message.sent_sequence "#, resume_at, ) .map(|row| History { message: Message { - sent: Instant { - at: row.sent_at, - sequence: row.sent_sequence, - }, + sent: Instant::new(row.sent_at, row.sent_sequence), channel: row.channel, sender: row.sender, id: row.id, - body: row.body, + body: row.body.unwrap_or_default(), + deleted_at: row.deleted_at, }, - deleted: None, + deleted: Instant::optional(row.deleted_at, row.deleted_sequence), }) .fetch_all(&mut *self.0) .await?; @@ -147,33 +143,35 @@ impl<'c> Messages<'c> { Ok(messages) } - async fn by_id(&mut self, message: &Id) -> Result<History, sqlx::Error> { + pub async fn by_id(&mut self, message: &Id) -> Result<History, sqlx::Error> { let message = sqlx::query!( r#" select - channel as "channel: channel::Id", - sender as "sender: login::Id", + message.channel as "channel: channel::Id", + message.sender as "sender: login::Id", id as "id: Id", - body, - sent_at as "sent_at: DateTime", - sent_sequence as "sent_sequence: Sequence" + message.body, + message.sent_at as "sent_at: DateTime", + message.sent_sequence as "sent_sequence: Sequence", + deleted.deleted_at as "deleted_at?: DateTime", + deleted.deleted_sequence as "deleted_sequence?: Sequence" from message + left join message_deleted as deleted + using (id) where id = $1 "#, message, ) .map(|row| History { message: Message { - sent: Instant { - at: row.sent_at, - sequence: row.sent_sequence, - }, + sent: Instant::new(row.sent_at, row.sent_sequence), channel: row.channel, sender: row.sender, id: row.id, - body: row.body, + body: row.body.unwrap_or_default(), + deleted_at: row.deleted_at, }, - deleted: None, + deleted: Instant::optional(row.deleted_at, row.deleted_sequence), }) .fetch_one(&mut *self.0) .await?; @@ -183,39 +181,103 @@ impl<'c> Messages<'c> { pub async fn delete( &mut self, - message: &Id, + message: &History, deleted: &Instant, ) -> Result<History, sqlx::Error> { - let history = self.by_id(message).await?; + let id = message.id(); sqlx::query_scalar!( r#" - delete from message - where - id = $1 - returning 1 as "deleted: i64" + insert into message_deleted (id, deleted_at, deleted_sequence) + values ($1, $2, $3) + returning 1 as "deleted: bool" "#, - history.message.id, + id, + deleted.at, + deleted.sequence, ) .fetch_one(&mut *self.0) .await?; - Ok(History { - deleted: Some(*deleted), - ..history - }) + // Small social responsibility hack here: when a message is deleted, its body is + // retconned to have been the empty string. Someone reading the event stream + // afterwards, or looking at messages in the channel, cannot retrieve the + // "deleted" message by ignoring the deletion event. + sqlx::query_scalar!( + r#" + update message + set body = "" + where id = $1 + returning 1 as "blanked: bool" + "#, + id, + ) + .fetch_one(&mut *self.0) + .await?; + + let message = self.by_id(id).await?; + + Ok(message) } - pub async fn expired(&mut self, expire_at: &DateTime) -> Result<Vec<Id>, sqlx::Error> { + pub async fn purge(&mut self, purge_at: &DateTime) -> Result<(), sqlx::Error> { let messages = sqlx::query_scalar!( r#" + delete from message_deleted + where deleted_at < $1 + returning id as "id: Id" + "#, + purge_at, + ) + .fetch_all(&mut *self.0) + .await?; + + for message in messages { + sqlx::query!( + r#" + delete from message + where id = $1 + "#, + message, + ) + .execute(&mut *self.0) + .await?; + } + + Ok(()) + } + + pub async fn expired(&mut self, expire_at: &DateTime) -> Result<Vec<History>, sqlx::Error> { + let messages = sqlx::query!( + r#" select - id as "message: Id" + id as "id: Id", + message.channel as "channel: channel::Id", + message.sender as "sender: login::Id", + message.sent_at as "sent_at: DateTime", + message.sent_sequence as "sent_sequence: Sequence", + message.body, + deleted.deleted_at as "deleted_at?: DateTime", + deleted.deleted_sequence as "deleted_sequence?: Sequence" from message - where sent_at < $1 + left join message_deleted as deleted + using (id) + where message.sent_at < $1 + and deleted.id is null "#, expire_at, ) + .map(|row| History { + message: Message { + sent: Instant::new(row.sent_at, row.sent_sequence), + id: row.id, + channel: row.channel, + sender: row.sender, + body: row.body.unwrap_or_default(), + deleted_at: row.deleted_at, + }, + deleted: Instant::optional(row.deleted_at, row.deleted_sequence), + }) .fetch_all(&mut *self.0) .await?; @@ -226,29 +288,31 @@ impl<'c> Messages<'c> { let messages = sqlx::query!( r#" select - channel as "channel: channel::Id", - sender as "sender: login::Id", id as "id: Id", - body, - sent_at as "sent_at: DateTime", - sent_sequence as "sent_sequence: Sequence" + message.channel as "channel: channel::Id", + message.sender as "sender: login::Id", + message.sent_at as "sent_at: DateTime", + message.sent_sequence as "sent_sequence: Sequence", + message.body, + deleted.deleted_at as "deleted_at: DateTime", + deleted.deleted_sequence as "deleted_sequence: Sequence" from message + left join message_deleted as deleted + using (id) where coalesce(message.sent_sequence > $1, true) "#, resume_at, ) .map(|row| History { message: Message { - sent: Instant { - at: row.sent_at, - sequence: row.sent_sequence, - }, + sent: Instant::new(row.sent_at, row.sent_sequence), channel: row.channel, sender: row.sender, id: row.id, - body: row.body, + body: row.body.unwrap_or_default(), + deleted_at: row.deleted_at, }, - deleted: None, + deleted: Instant::optional(row.deleted_at, row.deleted_sequence), }) .fetch_all(&mut *self.0) .await?; diff --git a/src/message/routes/message.rs b/src/message/routes/message.rs index 059b8c1..fbef35a 100644 --- a/src/message/routes/message.rs +++ b/src/message/routes/message.rs @@ -33,9 +33,9 @@ pub mod delete { let Self(error) = self; #[allow(clippy::match_wildcard_for_single_variants)] match error { - DeleteError::ChannelNotFound(_) | DeleteError::NotFound(_) => { - NotFound(error).into_response() - } + DeleteError::ChannelNotFound(_) + | DeleteError::NotFound(_) + | DeleteError::Deleted(_) => NotFound(error).into_response(), other => Internal::from(other).into_response(), } } diff --git a/src/message/snapshot.rs b/src/message/snapshot.rs index 0eb37bb..7300918 100644 --- a/src/message/snapshot.rs +++ b/src/message/snapshot.rs @@ -2,7 +2,7 @@ use super::{ event::{Event, Sent}, Id, }; -use crate::{channel, event::Instant, login}; +use crate::{channel, clock::DateTime, event::Instant, login}; #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] pub struct Message { @@ -12,6 +12,8 @@ pub struct Message { pub sender: login::Id, pub id: Id, pub body: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub deleted_at: Option<DateTime>, } impl Message { diff --git a/src/test/fixtures/channel.rs b/src/test/fixtures/channel.rs index b678717..a1dda61 100644 --- a/src/test/fixtures/channel.rs +++ b/src/test/fixtures/channel.rs @@ -1,10 +1,17 @@ +use std::future; + use faker_rand::{ en_us::{addresses::CityName, names::FullName}, faker_impl_from_templates, }; use rand; -use crate::{app::App, channel::Channel, clock::RequestedAt}; +use crate::{ + app::App, + channel::{self, Channel}, + clock::RequestedAt, + event::Event, +}; pub async fn create(app: &App, created_at: &RequestedAt) -> Channel { let name = propose(); @@ -22,3 +29,17 @@ struct Name(String); faker_impl_from_templates! { Name; "{} {}", CityName, FullName; } + +pub fn events(event: Event) -> future::Ready<Option<channel::Event>> { + future::ready(match event { + Event::Channel(channel) => Some(channel), + _ => None, + }) +} + +pub fn created(event: channel::Event) -> future::Ready<Option<channel::event::Created>> { + future::ready(match event { + channel::Event::Created(event) => Some(event), + channel::Event::Deleted(_) => None, + }) +} diff --git a/src/test/fixtures/event.rs b/src/test/fixtures/event.rs index 7fe2bf3..fa4fbc0 100644 --- a/src/test/fixtures/event.rs +++ b/src/test/fixtures/event.rs @@ -1,11 +1,8 @@ -use crate::{ - event::Event, - message::{Event::Sent, Message}, -}; +use crate::message::{Event, Message}; pub fn message_sent(event: &Event, message: &Message) -> bool { matches!( &event, - Event::Message(Sent(event)) if message == &event.into() + Event::Sent(event) if message == &event.into() ) } diff --git a/src/test/fixtures/filter.rs b/src/test/fixtures/filter.rs deleted file mode 100644 index 84d27b0..0000000 --- a/src/test/fixtures/filter.rs +++ /dev/null @@ -1,11 +0,0 @@ -use futures::future; - -use crate::{channel::Event::Created, event::Event, message::Event::Sent}; - -pub fn messages() -> impl FnMut(&Event) -> future::Ready<bool> { - |event| future::ready(matches!(event, Event::Message(Sent(_)))) -} - -pub fn created() -> impl FnMut(&Event) -> future::Ready<bool> { - |event| future::ready(matches!(event, Event::Channel(Created(_)))) -} diff --git a/src/test/fixtures/message.rs b/src/test/fixtures/message.rs index 381b10b..eb00e7c 100644 --- a/src/test/fixtures/message.rs +++ b/src/test/fixtures/message.rs @@ -1,6 +1,15 @@ +use std::future; + use faker_rand::lorem::Paragraphs; -use crate::{app::App, channel::Channel, clock::RequestedAt, login::Login, message::Message}; +use crate::{ + app::App, + channel::Channel, + clock::RequestedAt, + event::Event, + login::Login, + message::{self, Message}, +}; pub async fn send(app: &App, channel: &Channel, login: &Login, sent_at: &RequestedAt) -> Message { let body = propose(); @@ -14,3 +23,10 @@ pub async fn send(app: &App, channel: &Channel, login: &Login, sent_at: &Request pub fn propose() -> String { rand::random::<Paragraphs>().to_string() } + +pub fn events(event: Event) -> future::Ready<Option<message::Event>> { + future::ready(match event { + Event::Message(event) => Some(event), + _ => None, + }) +} diff --git a/src/test/fixtures/mod.rs b/src/test/fixtures/mod.rs index 41f7e13..9658831 100644 --- a/src/test/fixtures/mod.rs +++ b/src/test/fixtures/mod.rs @@ -4,7 +4,6 @@ use crate::{app::App, clock::RequestedAt, db}; pub mod channel; pub mod event; -pub mod filter; pub mod future; pub mod identity; pub mod login; diff --git a/src/token/repo/auth.rs b/src/token/repo/auth.rs index 9aee81f..88d0878 100644 --- a/src/token/repo/auth.rs +++ b/src/token/repo/auth.rs @@ -40,10 +40,7 @@ impl<'t> Auth<'t> { id: row.id, name: row.name, }, - created: Instant { - at: row.created_at, - sequence: row.created_sequence, - }, + created: Instant::new(row.created_at, row.created_sequence), }, row.password_hash, ) |
