summaryrefslogtreecommitdiff
path: root/src/channel
diff options
context:
space:
mode:
authorOwen Jacobson <owen@grimoire.ca>2024-10-17 01:45:58 -0400
committerOwen Jacobson <owen@grimoire.ca>2024-10-17 02:03:14 -0400
commit777e4281431a036eb663b5eec70f347b7425737d (patch)
treeeeb9ad74c6b706db3bad146e136067c51ff7e4b0 /src/channel
parentea74daca4809e4008dd8d01039db9fff3be659d9 (diff)
Retain deleted messages and channels temporarily, to preserve events for replay.
Previously, when a channel (message) was deleted, `hi` would send events to all _connected_ clients to inform them of the deletion, then delete all memory of the channel (message). Any disconnected client, on reconnecting, would not receive the deletion event, and would de-synch with the service. The creation events were also immediately retconned out of the event stream, as well. With this change, `hi` keeps a record of deleted channels (messages). When replaying events, these records are used to replay the deletion event. After 7 days, the retained data is deleted, both to keep storage under control and to conform to users' expectations that deleted means gone. To match users' likely intuitions about what deletion does, deleting a channel (message) _does_ immediately delete some of its associated data. Channels' names are blanked, and messages' bodies are also blanked. When the event stream is replayed, the original channel.created (message.sent) event is "tombstoned", with an additional `deleted_at` field to inform clients. The included client does not use this field, at least yet. The migration is, once again, screamingingly complicated due to sqlite's limited ALTER TABLE … ALTER COLUMN support. This change also contains capabilities that would allow the API to return 410 Gone for deleted channels or messages, instead of 404. I did experiment with this, but it's tricky to do pervasively, especially since most app-level interfaces return an `Option<Channel>` or `Option<Message>`. Redesigning these to return either `Ok(Channel)` (`Ok(Message)`) or `Err(Error::NotFound)` or `Err(Error::Deleted)` is more work than I wanted to take on for this change, and the utility of 410 Gone responses is not obvious to me. We have other, more pressing API design warts to address.
Diffstat (limited to 'src/channel')
-rw-r--r--src/channel/app.rs30
-rw-r--r--src/channel/history.rs5
-rw-r--r--src/channel/repo.rs196
-rw-r--r--src/channel/routes/channel/delete.rs2
-rw-r--r--src/channel/routes/channel/test.rs6
-rw-r--r--src/channel/routes/test.rs106
-rw-r--r--src/channel/snapshot.rs3
7 files changed, 266 insertions, 82 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs
index 46eaba8..0409076 100644
--- a/src/channel/app.rs
+++ b/src/channel/app.rs
@@ -23,9 +23,9 @@ impl<'a> Channels<'a> {
pub async fn create(&self, name: &str, created_at: &DateTime) -> Result<Channel, CreateError> {
let mut tx = self.db.begin().await?;
let created = tx.sequence().next(created_at).await?;
- let channel = tx
- .channels()
- .create(name, &created)
+ let channel = tx.channels().create(name, &created).await?;
+ tx.channels()
+ .reserve_name(&channel, name)
.await
.duplicate(|| CreateError::DuplicateName(name.into()))?;
tx.commit().await?;
@@ -43,7 +43,7 @@ impl<'a> Channels<'a> {
let channel = tx.channels().by_id(channel).await.optional()?;
tx.commit().await?;
- Ok(channel.iter().flat_map(History::events).collect())
+ Ok(channel.as_ref().and_then(History::as_snapshot))
}
pub async fn delete(&self, channel: &Id, deleted_at: &DateTime) -> Result<(), Error> {
@@ -54,13 +54,16 @@ impl<'a> Channels<'a> {
.by_id(channel)
.await
.not_found(|| Error::NotFound(channel.clone()))?;
+ channel
+ .as_snapshot()
+ .ok_or_else(|| Error::Deleted(channel.id().clone()))?;
let mut events = Vec::new();
- let messages = tx.messages().in_channel(&channel, None).await?;
+ let messages = tx.messages().live(&channel).await?;
for message in messages {
let deleted = tx.sequence().next(deleted_at).await?;
- let message = tx.messages().delete(message.id(), &deleted).await?;
+ let message = tx.messages().delete(&message, &deleted).await?;
events.extend(
message
.events()
@@ -70,7 +73,7 @@ impl<'a> Channels<'a> {
}
let deleted = tx.sequence().next(deleted_at).await?;
- let channel = tx.channels().delete(channel.id(), &deleted).await?;
+ let channel = tx.channels().delete(&channel, &deleted).await?;
events.extend(
channel
.events()
@@ -115,6 +118,17 @@ impl<'a> Channels<'a> {
Ok(())
}
+
+ pub async fn purge(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> {
+ // Somewhat arbitrarily, purge after 7 days.
+ let purge_at = relative_to.to_owned() - TimeDelta::days(7);
+
+ let mut tx = self.db.begin().await?;
+ tx.channels().purge(&purge_at).await?;
+ tx.commit().await?;
+
+ Ok(())
+ }
}
#[derive(Debug, thiserror::Error)]
@@ -129,6 +143,8 @@ pub enum CreateError {
pub enum Error {
#[error("channel {0} not found")]
NotFound(Id),
+ #[error("channel {0} deleted")]
+ Deleted(Id),
#[error(transparent)]
Database(#[from] sqlx::Error),
}
diff --git a/src/channel/history.rs b/src/channel/history.rs
index 78b3437..4b9fcc7 100644
--- a/src/channel/history.rs
+++ b/src/channel/history.rs
@@ -31,6 +31,11 @@ impl History {
.filter(Sequence::up_to(resume_point.into()))
.collect()
}
+
+ // Snapshot of this channel as of all events recorded in this history.
+ pub fn as_snapshot(&self) -> Option<Channel> {
+ self.events().collect()
+ }
}
// Event factories
diff --git a/src/channel/repo.rs b/src/channel/repo.rs
index 2f57581..4b10c54 100644
--- a/src/channel/repo.rs
+++ b/src/channel/repo.rs
@@ -41,11 +41,9 @@ impl<'c> Channels<'c> {
channel: Channel {
id: row.id,
name: row.name,
+ deleted_at: None,
},
- created: Instant {
- at: row.created_at,
- sequence: row.created_sequence,
- },
+ created: Instant::new(row.created_at, row.created_sequence),
deleted: None,
})
.fetch_one(&mut *self.0)
@@ -54,15 +52,35 @@ impl<'c> Channels<'c> {
Ok(channel)
}
+ pub async fn reserve_name(&mut self, channel: &History, name: &str) -> Result<(), sqlx::Error> {
+ let channel = channel.id();
+ sqlx::query!(
+ r#"
+ insert into channel_name_reservation (id, name)
+ values ($1, $2)
+ "#,
+ channel,
+ name,
+ )
+ .execute(&mut *self.0)
+ .await?;
+
+ Ok(())
+ }
+
pub async fn by_id(&mut self, channel: &Id) -> Result<History, sqlx::Error> {
let channel = sqlx::query!(
r#"
select
id as "id: Id",
- name,
- created_at as "created_at: DateTime",
- created_sequence as "created_sequence: Sequence"
+ channel.name,
+ channel.created_at as "created_at: DateTime",
+ channel.created_sequence as "created_sequence: Sequence",
+ deleted.deleted_at as "deleted_at: DateTime",
+ deleted.deleted_sequence as "deleted_sequence: Sequence"
from channel
+ left join channel_deleted as deleted
+ using (id)
where id = $1
"#,
channel,
@@ -71,12 +89,10 @@ impl<'c> Channels<'c> {
channel: Channel {
id: row.id,
name: row.name,
+ deleted_at: row.deleted_at,
},
- created: Instant {
- at: row.created_at,
- sequence: row.created_sequence,
- },
- deleted: None,
+ created: Instant::new(row.created_at, row.created_sequence),
+ deleted: Instant::optional(row.deleted_at, row.deleted_sequence),
})
.fetch_one(&mut *self.0)
.await?;
@@ -89,11 +105,15 @@ impl<'c> Channels<'c> {
r#"
select
id as "id: Id",
- name,
- created_at as "created_at: DateTime",
- created_sequence as "created_sequence: Sequence"
+ channel.name,
+ channel.created_at as "created_at: DateTime",
+ channel.created_sequence as "created_sequence: Sequence",
+ deleted.deleted_at as "deleted_at: DateTime",
+ deleted.deleted_sequence as "deleted_sequence: Sequence"
from channel
- where coalesce(created_sequence <= $1, true)
+ left join channel_deleted as deleted
+ using (id)
+ where coalesce(channel.created_sequence <= $1, true)
order by channel.name
"#,
resume_at,
@@ -102,12 +122,10 @@ impl<'c> Channels<'c> {
channel: Channel {
id: row.id,
name: row.name,
+ deleted_at: row.deleted_at,
},
- created: Instant {
- at: row.created_at,
- sequence: row.created_sequence,
- },
- deleted: None,
+ created: Instant::new(row.created_at, row.created_sequence),
+ deleted: Instant::optional(row.deleted_at, row.deleted_sequence),
})
.fetch_all(&mut *self.0)
.await?;
@@ -123,11 +141,15 @@ impl<'c> Channels<'c> {
r#"
select
id as "id: Id",
- name,
- created_at as "created_at: DateTime",
- created_sequence as "created_sequence: Sequence"
+ channel.name,
+ channel.created_at as "created_at: DateTime",
+ channel.created_sequence as "created_sequence: Sequence",
+ deleted.deleted_at as "deleted_at: DateTime",
+ deleted.deleted_sequence as "deleted_sequence: Sequence"
from channel
- where coalesce(created_sequence > $1, true)
+ left join channel_deleted as deleted
+ using (id)
+ where coalesce(channel.created_sequence > $1, true)
"#,
resume_at,
)
@@ -135,12 +157,10 @@ impl<'c> Channels<'c> {
channel: Channel {
id: row.id,
name: row.name,
+ deleted_at: row.deleted_at,
},
- created: Instant {
- at: row.created_at,
- sequence: row.created_sequence,
- },
- deleted: None,
+ created: Instant::new(row.created_at, row.created_sequence),
+ deleted: Instant::optional(row.deleted_at, row.deleted_sequence),
})
.fetch_all(&mut *self.0)
.await?;
@@ -150,50 +170,118 @@ impl<'c> Channels<'c> {
pub async fn delete(
&mut self,
- channel: &Id,
+ channel: &History,
deleted: &Instant,
) -> Result<History, sqlx::Error> {
- let channel = sqlx::query!(
+ let id = channel.id();
+ sqlx::query_scalar!(
r#"
- delete from channel
+ delete from channel_name_reservation
where id = $1
- returning
- id as "id: Id",
- name,
- created_at as "created_at: DateTime",
- created_sequence as "created_sequence: Sequence"
+ returning 1 as "deleted: bool"
"#,
- channel,
+ id,
+ )
+ .fetch_one(&mut *self.0)
+ .await?;
+
+ sqlx::query_scalar!(
+ r#"
+ insert into channel_deleted (id, deleted_at, deleted_sequence)
+ values ($1, $2, $3)
+ returning 1 as "deleted: bool"
+ "#,
+ id,
+ deleted.at,
+ deleted.sequence,
+ )
+ .fetch_one(&mut *self.0)
+ .await?;
+
+ // Small social responsibility hack here: when a channel is deleted, its name is
+ // retconned to have been the empty string. Someone reading the event stream
+ // afterwards, or looking at channels via the API, cannot retrieve the
+ // "deleted" channel's information by ignoring the deletion event.
+ sqlx::query_scalar!(
+ r#"
+ update channel
+ set name = ""
+ where id = $1
+ returning 1 as "updated: bool"
+ "#,
+ id,
)
- .map(|row| History {
- channel: Channel {
- id: row.id,
- name: row.name,
- },
- created: Instant {
- at: row.created_at,
- sequence: row.created_sequence,
- },
- deleted: Some(*deleted),
- })
.fetch_one(&mut *self.0)
.await?;
+ let channel = self.by_id(id).await?;
+
Ok(channel)
}
- pub async fn expired(&mut self, expired_at: &DateTime) -> Result<Vec<Id>, sqlx::Error> {
+ pub async fn purge(&mut self, purge_at: &DateTime) -> Result<(), sqlx::Error> {
let channels = sqlx::query_scalar!(
r#"
+ with has_messages as (
+ select channel
+ from message
+ group by channel
+ )
+ delete from channel_deleted
+ where deleted_at < $1
+ and id not in has_messages
+ returning id as "id: Id"
+ "#,
+ purge_at,
+ )
+ .fetch_all(&mut *self.0)
+ .await?;
+
+ for channel in channels {
+ // Wanted: a way to batch these up into one query.
+ sqlx::query!(
+ r#"
+ delete from channel
+ where id = $1
+ "#,
+ channel,
+ )
+ .execute(&mut *self.0)
+ .await?;
+ }
+
+ Ok(())
+ }
+
+ pub async fn expired(&mut self, expired_at: &DateTime) -> Result<Vec<History>, sqlx::Error> {
+ let channels = sqlx::query!(
+ r#"
select
- channel.id as "id: Id"
+ channel.id as "id: Id",
+ channel.name,
+ channel.created_at as "created_at: DateTime",
+ channel.created_sequence as "created_sequence: Sequence",
+ deleted.deleted_at as "deleted_at: DateTime",
+ deleted.deleted_sequence as "deleted_sequence: Sequence"
from channel
- left join message
- where created_at < $1
+ left join channel_deleted as deleted
+ using (id)
+ left join message
+ where channel.created_at < $1
and message.id is null
+ and deleted.id is null
"#,
expired_at,
)
+ .map(|row| History {
+ channel: Channel {
+ id: row.id,
+ name: row.name,
+ deleted_at: row.deleted_at,
+ },
+ created: Instant::new(row.created_at, row.created_sequence),
+ deleted: Instant::optional(row.deleted_at, row.deleted_sequence),
+ })
.fetch_all(&mut *self.0)
.await?;
diff --git a/src/channel/routes/channel/delete.rs b/src/channel/routes/channel/delete.rs
index efac0c0..5f40ddf 100644
--- a/src/channel/routes/channel/delete.rs
+++ b/src/channel/routes/channel/delete.rs
@@ -32,7 +32,7 @@ impl IntoResponse for Error {
let Self(error) = self;
#[allow(clippy::match_wildcard_for_single_variants)]
match error {
- app::Error::NotFound(_) => NotFound(error).into_response(),
+ app::Error::NotFound(_) | app::Error::Deleted(_) => NotFound(error).into_response(),
other => Internal::from(other).into_response(),
}
}
diff --git a/src/channel/routes/channel/test.rs b/src/channel/routes/channel/test.rs
index bc02b20..c6541cd 100644
--- a/src/channel/routes/channel/test.rs
+++ b/src/channel/routes/channel/test.rs
@@ -4,7 +4,7 @@ use futures::stream::StreamExt;
use super::post;
use crate::{
channel,
- event::{self, Sequenced},
+ event::Sequenced,
message::{self, app::SendError},
test::fixtures::{self, future::Immediately as _},
};
@@ -45,7 +45,7 @@ async fn messages_in_order() {
.subscribe(None)
.await
.expect("subscribing to a valid channel")
- .filter(fixtures::filter::messages())
+ .filter_map(fixtures::message::events)
.take(requests.len());
let events = events.collect::<Vec<_>>().immediately().await;
@@ -54,7 +54,7 @@ async fn messages_in_order() {
assert_eq!(*sent_at, event.at());
assert!(matches!(
event,
- event::Event::Message(message::Event::Sent(event))
+ message::Event::Sent(event)
if event.message.sender == sender.id
&& event.message.body == message
));
diff --git a/src/channel/routes/test.rs b/src/channel/routes/test.rs
index 81f1465..ffd8484 100644
--- a/src/channel/routes/test.rs
+++ b/src/channel/routes/test.rs
@@ -1,10 +1,11 @@
+use std::future;
+
use axum::extract::{Json, State};
use futures::stream::StreamExt as _;
use super::post;
use crate::{
- channel::{self, app},
- event,
+ channel::app,
test::fixtures::{self, future::Immediately as _},
};
@@ -19,29 +20,35 @@ async fn new_channel() {
let name = fixtures::channel::propose();
let request = post::Request { name: name.clone() };
- let Json(response_channel) =
- post::handler(State(app.clone()), creator, fixtures::now(), Json(request))
- .await
- .expect("new channel in an empty app");
+ let Json(response) = post::handler(State(app.clone()), creator, fixtures::now(), Json(request))
+ .await
+ .expect("creating a channel in an empty app succeeds");
// Verify the structure of the response
- assert_eq!(name, response_channel.name);
+ assert_eq!(name, response.name);
// Verify the semantics
let snapshot = app.boot().snapshot().await.expect("boot always succeeds");
- assert!(snapshot
- .channels
- .iter()
- .any(|channel| channel.name == response_channel.name && channel.id == response_channel.id));
+ assert!(snapshot.channels.iter().any(|channel| channel == &response));
+
+ let channel = app
+ .channels()
+ .get(&response.id)
+ .await
+ .expect("searching for channels by ID never fails")
+ .expect("the newly-created channel exists");
+ assert_eq!(response, channel);
let mut events = app
.events()
.subscribe(None)
.await
.expect("subscribing never fails")
- .filter(fixtures::filter::created());
+ .filter_map(fixtures::channel::events)
+ .filter_map(fixtures::channel::created)
+ .filter(|event| future::ready(event.channel == response));
let event = events
.next()
@@ -49,11 +56,7 @@ async fn new_channel() {
.await
.expect("creation event published");
- assert!(matches!(
- event,
- event::Event::Channel(channel::Event::Created(event))
- if event.channel == response_channel
- ));
+ assert_eq!(event.channel, response);
}
#[tokio::test]
@@ -81,3 +84,72 @@ async fn duplicate_name() {
app::CreateError::DuplicateName(name) if channel.name == name
));
}
+
+#[tokio::test]
+async fn name_reusable_after_delete() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let creator = fixtures::login::create(&app, &fixtures::now()).await;
+ let name = fixtures::channel::propose();
+
+ // Call the endpoint (first time)
+
+ let request = post::Request { name: name.clone() };
+ let Json(response) = post::handler(
+ State(app.clone()),
+ creator.clone(),
+ fixtures::now(),
+ Json(request),
+ )
+ .await
+ .expect("new channel in an empty app");
+
+ // Delete the channel
+
+ app.channels()
+ .delete(&response.id, &fixtures::now())
+ .await
+ .expect("deleting a newly-created channel succeeds");
+
+ // Call the endpoint (second time)
+
+ let request = post::Request { name: name.clone() };
+ let Json(response) = post::handler(State(app.clone()), creator, fixtures::now(), Json(request))
+ .await
+ .expect("new channel in an empty app");
+
+ // Verify the structure of the response
+
+ assert_eq!(name, response.name);
+
+ // Verify the semantics
+
+ let snapshot = app.boot().snapshot().await.expect("boot always succeeds");
+ assert!(snapshot.channels.iter().any(|channel| channel == &response));
+
+ let channel = app
+ .channels()
+ .get(&response.id)
+ .await
+ .expect("searching for channels by ID never fails")
+ .expect("the newly-created channel exists");
+ assert_eq!(response, channel);
+
+ let mut events = app
+ .events()
+ .subscribe(None)
+ .await
+ .expect("subscribing never fails")
+ .filter_map(fixtures::channel::events)
+ .filter_map(fixtures::channel::created)
+ .filter(|event| future::ready(event.channel == response));
+
+ let event = events
+ .next()
+ .immediately()
+ .await
+ .expect("creation event published");
+
+ assert_eq!(event.channel, response);
+}
diff --git a/src/channel/snapshot.rs b/src/channel/snapshot.rs
index d4d1d27..2b7d89a 100644
--- a/src/channel/snapshot.rs
+++ b/src/channel/snapshot.rs
@@ -2,11 +2,14 @@ use super::{
event::{Created, Event},
Id,
};
+use crate::clock::DateTime;
#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
pub struct Channel {
pub id: Id,
pub name: String,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub deleted_at: Option<DateTime>,
}
impl Channel {