diff options
Diffstat (limited to 'src/channel/repo.rs')
| -rw-r--r-- | src/channel/repo.rs | 196 |
1 files changed, 142 insertions, 54 deletions
diff --git a/src/channel/repo.rs b/src/channel/repo.rs index 2f57581..4b10c54 100644 --- a/src/channel/repo.rs +++ b/src/channel/repo.rs @@ -41,11 +41,9 @@ impl<'c> Channels<'c> { channel: Channel { id: row.id, name: row.name, + deleted_at: None, }, - created: Instant { - at: row.created_at, - sequence: row.created_sequence, - }, + created: Instant::new(row.created_at, row.created_sequence), deleted: None, }) .fetch_one(&mut *self.0) @@ -54,15 +52,35 @@ impl<'c> Channels<'c> { Ok(channel) } + pub async fn reserve_name(&mut self, channel: &History, name: &str) -> Result<(), sqlx::Error> { + let channel = channel.id(); + sqlx::query!( + r#" + insert into channel_name_reservation (id, name) + values ($1, $2) + "#, + channel, + name, + ) + .execute(&mut *self.0) + .await?; + + Ok(()) + } + pub async fn by_id(&mut self, channel: &Id) -> Result<History, sqlx::Error> { let channel = sqlx::query!( r#" select id as "id: Id", - name, - created_at as "created_at: DateTime", - created_sequence as "created_sequence: Sequence" + channel.name, + channel.created_at as "created_at: DateTime", + channel.created_sequence as "created_sequence: Sequence", + deleted.deleted_at as "deleted_at: DateTime", + deleted.deleted_sequence as "deleted_sequence: Sequence" from channel + left join channel_deleted as deleted + using (id) where id = $1 "#, channel, @@ -71,12 +89,10 @@ impl<'c> Channels<'c> { channel: Channel { id: row.id, name: row.name, + deleted_at: row.deleted_at, }, - created: Instant { - at: row.created_at, - sequence: row.created_sequence, - }, - deleted: None, + created: Instant::new(row.created_at, row.created_sequence), + deleted: Instant::optional(row.deleted_at, row.deleted_sequence), }) .fetch_one(&mut *self.0) .await?; @@ -89,11 +105,15 @@ impl<'c> Channels<'c> { r#" select id as "id: Id", - name, - created_at as "created_at: DateTime", - created_sequence as "created_sequence: Sequence" + channel.name, + channel.created_at as "created_at: DateTime", + channel.created_sequence as "created_sequence: Sequence", + deleted.deleted_at as "deleted_at: DateTime", + deleted.deleted_sequence as "deleted_sequence: Sequence" from channel - where coalesce(created_sequence <= $1, true) + left join channel_deleted as deleted + using (id) + where coalesce(channel.created_sequence <= $1, true) order by channel.name "#, resume_at, @@ -102,12 +122,10 @@ impl<'c> Channels<'c> { channel: Channel { id: row.id, name: row.name, + deleted_at: row.deleted_at, }, - created: Instant { - at: row.created_at, - sequence: row.created_sequence, - }, - deleted: None, + created: Instant::new(row.created_at, row.created_sequence), + deleted: Instant::optional(row.deleted_at, row.deleted_sequence), }) .fetch_all(&mut *self.0) .await?; @@ -123,11 +141,15 @@ impl<'c> Channels<'c> { r#" select id as "id: Id", - name, - created_at as "created_at: DateTime", - created_sequence as "created_sequence: Sequence" + channel.name, + channel.created_at as "created_at: DateTime", + channel.created_sequence as "created_sequence: Sequence", + deleted.deleted_at as "deleted_at: DateTime", + deleted.deleted_sequence as "deleted_sequence: Sequence" from channel - where coalesce(created_sequence > $1, true) + left join channel_deleted as deleted + using (id) + where coalesce(channel.created_sequence > $1, true) "#, resume_at, ) @@ -135,12 +157,10 @@ impl<'c> Channels<'c> { channel: Channel { id: row.id, name: row.name, + deleted_at: row.deleted_at, }, - created: Instant { - at: row.created_at, - sequence: row.created_sequence, - }, - deleted: None, + created: Instant::new(row.created_at, row.created_sequence), + deleted: Instant::optional(row.deleted_at, row.deleted_sequence), }) .fetch_all(&mut *self.0) .await?; @@ -150,50 +170,118 @@ impl<'c> Channels<'c> { pub async fn delete( &mut self, - channel: &Id, + channel: &History, deleted: &Instant, ) -> Result<History, sqlx::Error> { - let channel = sqlx::query!( + let id = channel.id(); + sqlx::query_scalar!( r#" - delete from channel + delete from channel_name_reservation where id = $1 - returning - id as "id: Id", - name, - created_at as "created_at: DateTime", - created_sequence as "created_sequence: Sequence" + returning 1 as "deleted: bool" "#, - channel, + id, + ) + .fetch_one(&mut *self.0) + .await?; + + sqlx::query_scalar!( + r#" + insert into channel_deleted (id, deleted_at, deleted_sequence) + values ($1, $2, $3) + returning 1 as "deleted: bool" + "#, + id, + deleted.at, + deleted.sequence, + ) + .fetch_one(&mut *self.0) + .await?; + + // Small social responsibility hack here: when a channel is deleted, its name is + // retconned to have been the empty string. Someone reading the event stream + // afterwards, or looking at channels via the API, cannot retrieve the + // "deleted" channel's information by ignoring the deletion event. + sqlx::query_scalar!( + r#" + update channel + set name = "" + where id = $1 + returning 1 as "updated: bool" + "#, + id, ) - .map(|row| History { - channel: Channel { - id: row.id, - name: row.name, - }, - created: Instant { - at: row.created_at, - sequence: row.created_sequence, - }, - deleted: Some(*deleted), - }) .fetch_one(&mut *self.0) .await?; + let channel = self.by_id(id).await?; + Ok(channel) } - pub async fn expired(&mut self, expired_at: &DateTime) -> Result<Vec<Id>, sqlx::Error> { + pub async fn purge(&mut self, purge_at: &DateTime) -> Result<(), sqlx::Error> { let channels = sqlx::query_scalar!( r#" + with has_messages as ( + select channel + from message + group by channel + ) + delete from channel_deleted + where deleted_at < $1 + and id not in has_messages + returning id as "id: Id" + "#, + purge_at, + ) + .fetch_all(&mut *self.0) + .await?; + + for channel in channels { + // Wanted: a way to batch these up into one query. + sqlx::query!( + r#" + delete from channel + where id = $1 + "#, + channel, + ) + .execute(&mut *self.0) + .await?; + } + + Ok(()) + } + + pub async fn expired(&mut self, expired_at: &DateTime) -> Result<Vec<History>, sqlx::Error> { + let channels = sqlx::query!( + r#" select - channel.id as "id: Id" + channel.id as "id: Id", + channel.name, + channel.created_at as "created_at: DateTime", + channel.created_sequence as "created_sequence: Sequence", + deleted.deleted_at as "deleted_at: DateTime", + deleted.deleted_sequence as "deleted_sequence: Sequence" from channel - left join message - where created_at < $1 + left join channel_deleted as deleted + using (id) + left join message + where channel.created_at < $1 and message.id is null + and deleted.id is null "#, expired_at, ) + .map(|row| History { + channel: Channel { + id: row.id, + name: row.name, + deleted_at: row.deleted_at, + }, + created: Instant::new(row.created_at, row.created_sequence), + deleted: Instant::optional(row.deleted_at, row.deleted_sequence), + }) .fetch_all(&mut *self.0) .await?; |
