diff options
| author | Owen Jacobson <owen@grimoire.ca> | 2024-10-17 01:45:58 -0400 |
|---|---|---|
| committer | Owen Jacobson <owen@grimoire.ca> | 2024-10-17 02:03:14 -0400 |
| commit | 777e4281431a036eb663b5eec70f347b7425737d (patch) | |
| tree | eeb9ad74c6b706db3bad146e136067c51ff7e4b0 /src/event | |
| parent | ea74daca4809e4008dd8d01039db9fff3be659d9 (diff) | |
Retain deleted messages and channels temporarily, to preserve events for replay.
Previously, when a channel (message) was deleted, `hi` would send events to all _connected_ clients to inform them of the deletion, then delete all memory of the channel (message). Any disconnected client, on reconnecting, would not receive the deletion event, and would de-synch with the service. The creation events were also immediately retconned out of the event stream, as well.
With this change, `hi` keeps a record of deleted channels (messages). When replaying events, these records are used to replay the deletion event. After 7 days, the retained data is deleted, both to keep storage under control and to conform to users' expectations that deleted means gone.
To match users' likely intuitions about what deletion does, deleting a channel (message) _does_ immediately delete some of its associated data. Channels' names are blanked, and messages' bodies are also blanked. When the event stream is replayed, the original channel.created (message.sent) event is "tombstoned", with an additional `deleted_at` field to inform clients. The included client does not use this field, at least yet.
The migration is, once again, screamingingly complicated due to sqlite's limited ALTER TABLE … ALTER COLUMN support.
This change also contains capabilities that would allow the API to return 410 Gone for deleted channels or messages, instead of 404. I did experiment with this, but it's tricky to do pervasively, especially since most app-level interfaces return an `Option<Channel>` or `Option<Message>`. Redesigning these to return either `Ok(Channel)` (`Ok(Message)`) or `Err(Error::NotFound)` or `Err(Error::Deleted)` is more work than I wanted to take on for this change, and the utility of 410 Gone responses is not obvious to me. We have other, more pressing API design warts to address.
Diffstat (limited to 'src/event')
| -rw-r--r-- | src/event/repo.rs | 5 | ||||
| -rw-r--r-- | src/event/routes/test.rs | 33 | ||||
| -rw-r--r-- | src/event/sequence.rs | 11 |
3 files changed, 31 insertions, 18 deletions
diff --git a/src/event/repo.rs b/src/event/repo.rs index 40d6a53..56beeea 100644 --- a/src/event/repo.rs +++ b/src/event/repo.rs @@ -29,10 +29,7 @@ impl<'c> Sequences<'c> { .fetch_one(&mut *self.0) .await?; - Ok(Instant { - at: *at, - sequence: next, - }) + Ok(Instant::new(*at, next)) } pub async fn current(&mut self) -> Result<Sequence, sqlx::Error> { diff --git a/src/event/routes/test.rs b/src/event/routes/test.rs index 249f5c2..e6a8b9d 100644 --- a/src/event/routes/test.rs +++ b/src/event/routes/test.rs @@ -31,7 +31,7 @@ async fn includes_historical_message() { // Verify the structure of the response. let event = events - .filter(fixtures::filter::messages()) + .filter_map(fixtures::message::events) .next() .immediately() .await @@ -62,7 +62,7 @@ async fn includes_live_message() { let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; let event = events - .filter(fixtures::filter::messages()) + .filter_map(fixtures::message::events) .next() .immediately() .await @@ -104,7 +104,7 @@ async fn includes_multiple_channels() { // Verify the structure of the response. let events = events - .filter(fixtures::filter::messages()) + .filter_map(fixtures::message::events) .take(messages.len()) .collect::<Vec<_>>() .immediately() @@ -141,13 +141,15 @@ async fn sequential_messages() { // Verify the structure of the response. - let mut events = events.filter(|event| { - future::ready( - messages - .iter() - .any(|message| fixtures::event::message_sent(event, message)), - ) - }); + let mut events = events + .filter_map(fixtures::message::events) + .filter(|event| { + future::ready( + messages + .iter() + .any(|message| fixtures::event::message_sent(event, message)), + ) + }); // Verify delivery in order for message in &messages { @@ -193,7 +195,7 @@ async fn resumes_from() { .expect("subscribe never fails"); let event = events - .filter(fixtures::filter::messages()) + .filter_map(fixtures::message::events) .next() .immediately() .await @@ -217,6 +219,7 @@ async fn resumes_from() { // Verify the structure of the response. let events = resumed + .filter_map(fixtures::message::events) .take(later_messages.len()) .collect::<Vec<_>>() .immediately() @@ -275,7 +278,7 @@ async fn serial_resume() { .expect("subscribe never fails"); let events = events - .filter(fixtures::filter::messages()) + .filter_map(fixtures::message::events) .take(initial_messages.len()) .collect::<Vec<_>>() .immediately() @@ -313,7 +316,7 @@ async fn serial_resume() { .expect("subscribe never fails"); let events = events - .filter(fixtures::filter::messages()) + .filter_map(fixtures::message::events) .take(resume_messages.len()) .collect::<Vec<_>>() .immediately() @@ -351,7 +354,7 @@ async fn serial_resume() { .expect("subscribe never fails"); let events = events - .filter(fixtures::filter::messages()) + .filter_map(fixtures::message::events) .take(final_messages.len()) .collect::<Vec<_>>() .immediately() @@ -401,6 +404,7 @@ async fn terminates_on_token_expiry() { ]; assert!(events + .filter_map(fixtures::message::events) .filter(|event| future::ready( messages .iter() @@ -452,6 +456,7 @@ async fn terminates_on_logout() { ]; assert!(events + .filter_map(fixtures::message::events) .filter(|event| future::ready( messages .iter() diff --git a/src/event/sequence.rs b/src/event/sequence.rs index bf6d5b8..9bc399b 100644 --- a/src/event/sequence.rs +++ b/src/event/sequence.rs @@ -10,6 +10,17 @@ pub struct Instant { pub sequence: Sequence, } +impl Instant { + pub fn new(at: DateTime, sequence: Sequence) -> Self { + Self { at, sequence } + } + + pub fn optional(at: Option<DateTime>, sequence: Option<Sequence>) -> Option<Self> { + at.zip(sequence) + .map(|(at, sequence)| Self::new(at, sequence)) + } +} + impl From<Instant> for Sequence { fn from(instant: Instant) -> Self { instant.sequence |
