diff options
Diffstat (limited to 'src/channel/repo.rs')
| -rw-r--r-- | src/channel/repo.rs | 179 |
1 files changed, 121 insertions, 58 deletions
diff --git a/src/channel/repo.rs b/src/channel/repo.rs index 2f57581..1cd1c80 100644 --- a/src/channel/repo.rs +++ b/src/channel/repo.rs @@ -28,7 +28,7 @@ impl<'c> Channels<'c> { values ($1, $2, $3, $4) returning id as "id: Id", - name, + name as "name!", -- known non-null as we just set it created_at as "created_at: DateTime", created_sequence as "created_sequence: Sequence" "#, @@ -41,11 +41,9 @@ impl<'c> Channels<'c> { channel: Channel { id: row.id, name: row.name, + deleted_at: None, }, - created: Instant { - at: row.created_at, - sequence: row.created_sequence, - }, + created: Instant::new(row.created_at, row.created_sequence), deleted: None, }) .fetch_one(&mut *self.0) @@ -59,10 +57,14 @@ impl<'c> Channels<'c> { r#" select id as "id: Id", - name, - created_at as "created_at: DateTime", - created_sequence as "created_sequence: Sequence" + channel.name, + channel.created_at as "created_at: DateTime", + channel.created_sequence as "created_sequence: Sequence", + deleted.deleted_at as "deleted_at?: DateTime", + deleted.deleted_sequence as "deleted_sequence?: Sequence" from channel + left join channel_deleted as deleted + using (id) where id = $1 "#, channel, @@ -70,13 +72,11 @@ impl<'c> Channels<'c> { .map(|row| History { channel: Channel { id: row.id, - name: row.name, + name: row.name.unwrap_or_default(), + deleted_at: row.deleted_at, }, - created: Instant { - at: row.created_at, - sequence: row.created_sequence, - }, - deleted: None, + created: Instant::new(row.created_at, row.created_sequence), + deleted: Instant::optional(row.deleted_at, row.deleted_sequence), }) .fetch_one(&mut *self.0) .await?; @@ -89,11 +89,15 @@ impl<'c> Channels<'c> { r#" select id as "id: Id", - name, - created_at as "created_at: DateTime", - created_sequence as "created_sequence: Sequence" + channel.name, + channel.created_at as "created_at: DateTime", + channel.created_sequence as "created_sequence: Sequence", + deleted.deleted_at as "deleted_at: DateTime", + deleted.deleted_sequence as "deleted_sequence: Sequence" from channel - where coalesce(created_sequence <= $1, true) + left join channel_deleted as deleted + using (id) + where coalesce(channel.created_sequence <= $1, true) order by channel.name "#, resume_at, @@ -101,13 +105,11 @@ impl<'c> Channels<'c> { .map(|row| History { channel: Channel { id: row.id, - name: row.name, + name: row.name.unwrap_or_default(), + deleted_at: row.deleted_at, }, - created: Instant { - at: row.created_at, - sequence: row.created_sequence, - }, - deleted: None, + created: Instant::new(row.created_at, row.created_sequence), + deleted: Instant::optional(row.deleted_at, row.deleted_sequence), }) .fetch_all(&mut *self.0) .await?; @@ -123,24 +125,26 @@ impl<'c> Channels<'c> { r#" select id as "id: Id", - name, - created_at as "created_at: DateTime", - created_sequence as "created_sequence: Sequence" + channel.name, + channel.created_at as "created_at: DateTime", + channel.created_sequence as "created_sequence: Sequence", + deleted.deleted_at as "deleted_at: DateTime", + deleted.deleted_sequence as "deleted_sequence: Sequence" from channel - where coalesce(created_sequence > $1, true) + left join channel_deleted as deleted + using (id) + where coalesce(channel.created_sequence > $1, true) "#, resume_at, ) .map(|row| History { channel: Channel { id: row.id, - name: row.name, + name: row.name.unwrap_or_default(), + deleted_at: row.deleted_at, }, - created: Instant { - at: row.created_at, - sequence: row.created_sequence, - }, - deleted: None, + created: Instant::new(row.created_at, row.created_sequence), + deleted: Instant::optional(row.deleted_at, row.deleted_sequence), }) .fetch_all(&mut *self.0) .await?; @@ -150,50 +154,109 @@ impl<'c> Channels<'c> { pub async fn delete( &mut self, - channel: &Id, + channel: &History, deleted: &Instant, ) -> Result<History, sqlx::Error> { - let channel = sqlx::query!( + let id = channel.id(); + sqlx::query_scalar!( r#" - delete from channel + insert into channel_deleted (id, deleted_at, deleted_sequence) + values ($1, $2, $3) + returning 1 as "deleted: bool" + "#, + id, + deleted.at, + deleted.sequence, + ) + .fetch_one(&mut *self.0) + .await?; + + // Small social responsibility hack here: when a channel is deleted, its name is + // retconned to have been the empty string. Someone reading the event stream + // afterwards, or looking at channels via the API, cannot retrieve the + // "deleted" channel's information by ignoring the deletion event. + // + // This also avoids the need for a separate name reservation table to ensure that live channels have unique names, since the `channel` table's name field is unique over non-null values. + sqlx::query_scalar!( + r#" + update channel + set name = null where id = $1 - returning - id as "id: Id", - name, - created_at as "created_at: DateTime", - created_sequence as "created_sequence: Sequence" + returning 1 as "updated: bool" "#, - channel, + id, ) - .map(|row| History { - channel: Channel { - id: row.id, - name: row.name, - }, - created: Instant { - at: row.created_at, - sequence: row.created_sequence, - }, - deleted: Some(*deleted), - }) .fetch_one(&mut *self.0) .await?; + let channel = self.by_id(id).await?; + Ok(channel) } - pub async fn expired(&mut self, expired_at: &DateTime) -> Result<Vec<Id>, sqlx::Error> { + pub async fn purge(&mut self, purge_at: &DateTime) -> Result<(), sqlx::Error> { let channels = sqlx::query_scalar!( r#" + with has_messages as ( + select channel + from message + group by channel + ) + delete from channel_deleted + where deleted_at < $1 + and id not in has_messages + returning id as "id: Id" + "#, + purge_at, + ) + .fetch_all(&mut *self.0) + .await?; + + for channel in channels { + // Wanted: a way to batch these up into one query. + sqlx::query!( + r#" + delete from channel + where id = $1 + "#, + channel, + ) + .execute(&mut *self.0) + .await?; + } + + Ok(()) + } + + pub async fn expired(&mut self, expired_at: &DateTime) -> Result<Vec<History>, sqlx::Error> { + let channels = sqlx::query!( + r#" select - channel.id as "id: Id" + channel.id as "id: Id", + channel.name, + channel.created_at as "created_at: DateTime", + channel.created_sequence as "created_sequence: Sequence", + deleted.deleted_at as "deleted_at?: DateTime", + deleted.deleted_sequence as "deleted_sequence?: Sequence" from channel - left join message - where created_at < $1 + left join channel_deleted as deleted + using (id) + left join message + where channel.created_at < $1 and message.id is null + and deleted.id is null "#, expired_at, ) + .map(|row| History { + channel: Channel { + id: row.id, + name: row.name.unwrap_or_default(), + deleted_at: row.deleted_at, + }, + created: Instant::new(row.created_at, row.created_sequence), + deleted: Instant::optional(row.deleted_at, row.deleted_sequence), + }) .fetch_all(&mut *self.0) .await?; |
