summaryrefslogtreecommitdiff
path: root/src/channel/repo.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/channel/repo.rs')
-rw-r--r--src/channel/repo.rs196
1 files changed, 142 insertions, 54 deletions
diff --git a/src/channel/repo.rs b/src/channel/repo.rs
index 2f57581..4b10c54 100644
--- a/src/channel/repo.rs
+++ b/src/channel/repo.rs
@@ -41,11 +41,9 @@ impl<'c> Channels<'c> {
channel: Channel {
id: row.id,
name: row.name,
+ deleted_at: None,
},
- created: Instant {
- at: row.created_at,
- sequence: row.created_sequence,
- },
+ created: Instant::new(row.created_at, row.created_sequence),
deleted: None,
})
.fetch_one(&mut *self.0)
@@ -54,15 +52,35 @@ impl<'c> Channels<'c> {
Ok(channel)
}
+ pub async fn reserve_name(&mut self, channel: &History, name: &str) -> Result<(), sqlx::Error> {
+ let channel = channel.id();
+ sqlx::query!(
+ r#"
+ insert into channel_name_reservation (id, name)
+ values ($1, $2)
+ "#,
+ channel,
+ name,
+ )
+ .execute(&mut *self.0)
+ .await?;
+
+ Ok(())
+ }
+
pub async fn by_id(&mut self, channel: &Id) -> Result<History, sqlx::Error> {
let channel = sqlx::query!(
r#"
select
id as "id: Id",
- name,
- created_at as "created_at: DateTime",
- created_sequence as "created_sequence: Sequence"
+ channel.name,
+ channel.created_at as "created_at: DateTime",
+ channel.created_sequence as "created_sequence: Sequence",
+ deleted.deleted_at as "deleted_at: DateTime",
+ deleted.deleted_sequence as "deleted_sequence: Sequence"
from channel
+ left join channel_deleted as deleted
+ using (id)
where id = $1
"#,
channel,
@@ -71,12 +89,10 @@ impl<'c> Channels<'c> {
channel: Channel {
id: row.id,
name: row.name,
+ deleted_at: row.deleted_at,
},
- created: Instant {
- at: row.created_at,
- sequence: row.created_sequence,
- },
- deleted: None,
+ created: Instant::new(row.created_at, row.created_sequence),
+ deleted: Instant::optional(row.deleted_at, row.deleted_sequence),
})
.fetch_one(&mut *self.0)
.await?;
@@ -89,11 +105,15 @@ impl<'c> Channels<'c> {
r#"
select
id as "id: Id",
- name,
- created_at as "created_at: DateTime",
- created_sequence as "created_sequence: Sequence"
+ channel.name,
+ channel.created_at as "created_at: DateTime",
+ channel.created_sequence as "created_sequence: Sequence",
+ deleted.deleted_at as "deleted_at: DateTime",
+ deleted.deleted_sequence as "deleted_sequence: Sequence"
from channel
- where coalesce(created_sequence <= $1, true)
+ left join channel_deleted as deleted
+ using (id)
+ where coalesce(channel.created_sequence <= $1, true)
order by channel.name
"#,
resume_at,
@@ -102,12 +122,10 @@ impl<'c> Channels<'c> {
channel: Channel {
id: row.id,
name: row.name,
+ deleted_at: row.deleted_at,
},
- created: Instant {
- at: row.created_at,
- sequence: row.created_sequence,
- },
- deleted: None,
+ created: Instant::new(row.created_at, row.created_sequence),
+ deleted: Instant::optional(row.deleted_at, row.deleted_sequence),
})
.fetch_all(&mut *self.0)
.await?;
@@ -123,11 +141,15 @@ impl<'c> Channels<'c> {
r#"
select
id as "id: Id",
- name,
- created_at as "created_at: DateTime",
- created_sequence as "created_sequence: Sequence"
+ channel.name,
+ channel.created_at as "created_at: DateTime",
+ channel.created_sequence as "created_sequence: Sequence",
+ deleted.deleted_at as "deleted_at: DateTime",
+ deleted.deleted_sequence as "deleted_sequence: Sequence"
from channel
- where coalesce(created_sequence > $1, true)
+ left join channel_deleted as deleted
+ using (id)
+ where coalesce(channel.created_sequence > $1, true)
"#,
resume_at,
)
@@ -135,12 +157,10 @@ impl<'c> Channels<'c> {
channel: Channel {
id: row.id,
name: row.name,
+ deleted_at: row.deleted_at,
},
- created: Instant {
- at: row.created_at,
- sequence: row.created_sequence,
- },
- deleted: None,
+ created: Instant::new(row.created_at, row.created_sequence),
+ deleted: Instant::optional(row.deleted_at, row.deleted_sequence),
})
.fetch_all(&mut *self.0)
.await?;
@@ -150,50 +170,118 @@ impl<'c> Channels<'c> {
pub async fn delete(
&mut self,
- channel: &Id,
+ channel: &History,
deleted: &Instant,
) -> Result<History, sqlx::Error> {
- let channel = sqlx::query!(
+ let id = channel.id();
+ sqlx::query_scalar!(
r#"
- delete from channel
+ delete from channel_name_reservation
where id = $1
- returning
- id as "id: Id",
- name,
- created_at as "created_at: DateTime",
- created_sequence as "created_sequence: Sequence"
+ returning 1 as "deleted: bool"
"#,
- channel,
+ id,
+ )
+ .fetch_one(&mut *self.0)
+ .await?;
+
+ sqlx::query_scalar!(
+ r#"
+ insert into channel_deleted (id, deleted_at, deleted_sequence)
+ values ($1, $2, $3)
+ returning 1 as "deleted: bool"
+ "#,
+ id,
+ deleted.at,
+ deleted.sequence,
+ )
+ .fetch_one(&mut *self.0)
+ .await?;
+
+ // Small social responsibility hack here: when a channel is deleted, its name is
+ // retconned to have been the empty string. Someone reading the event stream
+ // afterwards, or looking at channels via the API, cannot retrieve the
+ // "deleted" channel's information by ignoring the deletion event.
+ sqlx::query_scalar!(
+ r#"
+ update channel
+ set name = ""
+ where id = $1
+ returning 1 as "updated: bool"
+ "#,
+ id,
)
- .map(|row| History {
- channel: Channel {
- id: row.id,
- name: row.name,
- },
- created: Instant {
- at: row.created_at,
- sequence: row.created_sequence,
- },
- deleted: Some(*deleted),
- })
.fetch_one(&mut *self.0)
.await?;
+ let channel = self.by_id(id).await?;
+
Ok(channel)
}
- pub async fn expired(&mut self, expired_at: &DateTime) -> Result<Vec<Id>, sqlx::Error> {
+ pub async fn purge(&mut self, purge_at: &DateTime) -> Result<(), sqlx::Error> {
let channels = sqlx::query_scalar!(
r#"
+ with has_messages as (
+ select channel
+ from message
+ group by channel
+ )
+ delete from channel_deleted
+ where deleted_at < $1
+ and id not in has_messages
+ returning id as "id: Id"
+ "#,
+ purge_at,
+ )
+ .fetch_all(&mut *self.0)
+ .await?;
+
+ for channel in channels {
+ // Wanted: a way to batch these up into one query.
+ sqlx::query!(
+ r#"
+ delete from channel
+ where id = $1
+ "#,
+ channel,
+ )
+ .execute(&mut *self.0)
+ .await?;
+ }
+
+ Ok(())
+ }
+
+ pub async fn expired(&mut self, expired_at: &DateTime) -> Result<Vec<History>, sqlx::Error> {
+ let channels = sqlx::query!(
+ r#"
select
- channel.id as "id: Id"
+ channel.id as "id: Id",
+ channel.name,
+ channel.created_at as "created_at: DateTime",
+ channel.created_sequence as "created_sequence: Sequence",
+ deleted.deleted_at as "deleted_at: DateTime",
+ deleted.deleted_sequence as "deleted_sequence: Sequence"
from channel
- left join message
- where created_at < $1
+ left join channel_deleted as deleted
+ using (id)
+ left join message
+ where channel.created_at < $1
and message.id is null
+ and deleted.id is null
"#,
expired_at,
)
+ .map(|row| History {
+ channel: Channel {
+ id: row.id,
+ name: row.name,
+ deleted_at: row.deleted_at,
+ },
+ created: Instant::new(row.created_at, row.created_sequence),
+ deleted: Instant::optional(row.deleted_at, row.deleted_sequence),
+ })
.fetch_all(&mut *self.0)
.await?;