summaryrefslogtreecommitdiff
path: root/src/message/repo.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/message/repo.rs')
-rw-r--r--src/message/repo.rs204
1 files changed, 134 insertions, 70 deletions
diff --git a/src/message/repo.rs b/src/message/repo.rs
index 71c6d10..14ff7bf 100644
--- a/src/message/repo.rs
+++ b/src/message/repo.rs
@@ -53,14 +53,12 @@ impl<'c> Messages<'c> {
)
.map(|row| History {
message: Message {
- sent: Instant {
- at: row.sent_at,
- sequence: row.sent_sequence,
- },
+ sent: Instant::new(row.sent_at, row.sent_sequence),
channel: row.channel,
sender: row.sender,
id: row.id,
body: row.body,
+ deleted_at: None,
},
deleted: None,
})
@@ -70,41 +68,37 @@ impl<'c> Messages<'c> {
Ok(message)
}
- pub async fn in_channel(
- &mut self,
- channel: &channel::History,
- resume_at: ResumePoint,
- ) -> Result<Vec<History>, sqlx::Error> {
+ pub async fn live(&mut self, channel: &channel::History) -> Result<Vec<History>, sqlx::Error> {
let channel_id = channel.id();
let messages = sqlx::query!(
r#"
select
- channel as "channel: channel::Id",
- sender as "sender: login::Id",
+ message.channel as "channel: channel::Id",
+ message.sender as "sender: login::Id",
id as "id: Id",
- body,
- sent_at as "sent_at: DateTime",
- sent_sequence as "sent_sequence: Sequence"
+ message.body,
+ message.sent_at as "sent_at: DateTime",
+ message.sent_sequence as "sent_sequence: Sequence",
+ deleted.deleted_at as "deleted_at: DateTime",
+ deleted.deleted_sequence as "deleted_sequence: Sequence"
from message
- where channel = $1
- and coalesce(sent_sequence <= $2, true)
- order by sent_sequence
+ left join message_deleted as deleted
+ using (id)
+ where message.channel = $1
+ and deleted.id is null
"#,
channel_id,
- resume_at,
)
.map(|row| History {
message: Message {
- sent: Instant {
- at: row.sent_at,
- sequence: row.sent_sequence,
- },
+ sent: Instant::new(row.sent_at, row.sent_sequence),
channel: row.channel,
sender: row.sender,
id: row.id,
body: row.body,
+ deleted_at: row.deleted_at,
},
- deleted: None,
+ deleted: Instant::optional(row.deleted_at, row.deleted_sequence),
})
.fetch_all(&mut *self.0)
.await?;
@@ -116,30 +110,32 @@ impl<'c> Messages<'c> {
let messages = sqlx::query!(
r#"
select
- channel as "channel: channel::Id",
- sender as "sender: login::Id",
+ message.channel as "channel: channel::Id",
+ message.sender as "sender: login::Id",
id as "id: Id",
- body,
- sent_at as "sent_at: DateTime",
- sent_sequence as "sent_sequence: Sequence"
+ message.body,
+ message.sent_at as "sent_at: DateTime",
+ message.sent_sequence as "sent_sequence: Sequence",
+ deleted.deleted_at as "deleted_at: DateTime",
+ deleted.deleted_sequence as "deleted_sequence: Sequence"
from message
- where coalesce(sent_sequence <= $2, true)
- order by sent_sequence
+ left join message_deleted as deleted
+ using (id)
+ where coalesce(message.sent_sequence <= $2, true)
+ order by message.sent_sequence
"#,
resume_at,
)
.map(|row| History {
message: Message {
- sent: Instant {
- at: row.sent_at,
- sequence: row.sent_sequence,
- },
+ sent: Instant::new(row.sent_at, row.sent_sequence),
channel: row.channel,
sender: row.sender,
id: row.id,
body: row.body,
+ deleted_at: row.deleted_at,
},
- deleted: None,
+ deleted: Instant::optional(row.deleted_at, row.deleted_sequence),
})
.fetch_all(&mut *self.0)
.await?;
@@ -147,33 +143,35 @@ impl<'c> Messages<'c> {
Ok(messages)
}
- async fn by_id(&mut self, message: &Id) -> Result<History, sqlx::Error> {
+ pub async fn by_id(&mut self, message: &Id) -> Result<History, sqlx::Error> {
let message = sqlx::query!(
r#"
select
- channel as "channel: channel::Id",
- sender as "sender: login::Id",
+ message.channel as "channel: channel::Id",
+ message.sender as "sender: login::Id",
id as "id: Id",
- body,
- sent_at as "sent_at: DateTime",
- sent_sequence as "sent_sequence: Sequence"
+ message.body,
+ message.sent_at as "sent_at: DateTime",
+ message.sent_sequence as "sent_sequence: Sequence",
+ deleted.deleted_at as "deleted_at: DateTime",
+ deleted.deleted_sequence as "deleted_sequence: Sequence"
from message
+ left join message_deleted as deleted
+ using (id)
where id = $1
"#,
message,
)
.map(|row| History {
message: Message {
- sent: Instant {
- at: row.sent_at,
- sequence: row.sent_sequence,
- },
+ sent: Instant::new(row.sent_at, row.sent_sequence),
channel: row.channel,
sender: row.sender,
id: row.id,
body: row.body,
+ deleted_at: row.deleted_at,
},
- deleted: None,
+ deleted: Instant::optional(row.deleted_at, row.deleted_sequence),
})
.fetch_one(&mut *self.0)
.await?;
@@ -183,39 +181,103 @@ impl<'c> Messages<'c> {
pub async fn delete(
&mut self,
- message: &Id,
+ message: &History,
deleted: &Instant,
) -> Result<History, sqlx::Error> {
- let history = self.by_id(message).await?;
+ let id = message.id();
sqlx::query_scalar!(
r#"
- delete from message
- where
- id = $1
- returning 1 as "deleted: i64"
+ insert into message_deleted (id, deleted_at, deleted_sequence)
+ values ($1, $2, $3)
+ returning 1 as "deleted: bool"
"#,
- history.message.id,
+ id,
+ deleted.at,
+ deleted.sequence,
)
.fetch_one(&mut *self.0)
.await?;
- Ok(History {
- deleted: Some(*deleted),
- ..history
- })
+ // Small social responsibility hack here: when a message is deleted, its body is
+ // retconned to have been the empty string. Someone reading the event stream
+ // afterwards, or looking at messages in the channel, cannot retrieve the
+ // "deleted" message by ignoring the deletion event.
+ sqlx::query_scalar!(
+ r#"
+ update message
+ set body = ""
+ where id = $1
+ returning 1 as "blanked: bool"
+ "#,
+ id,
+ )
+ .fetch_one(&mut *self.0)
+ .await?;
+
+ let message = self.by_id(id).await?;
+
+ Ok(message)
}
- pub async fn expired(&mut self, expire_at: &DateTime) -> Result<Vec<Id>, sqlx::Error> {
+ pub async fn purge(&mut self, purge_at: &DateTime) -> Result<(), sqlx::Error> {
let messages = sqlx::query_scalar!(
r#"
+ delete from message_deleted
+ where deleted_at < $1
+ returning id as "id: Id"
+ "#,
+ purge_at,
+ )
+ .fetch_all(&mut *self.0)
+ .await?;
+
+ for message in messages {
+ sqlx::query!(
+ r#"
+ delete from message
+ where id = $1
+ "#,
+ message,
+ )
+ .execute(&mut *self.0)
+ .await?;
+ }
+
+ Ok(())
+ }
+
+ pub async fn expired(&mut self, expire_at: &DateTime) -> Result<Vec<History>, sqlx::Error> {
+ let messages = sqlx::query!(
+ r#"
select
- id as "message: Id"
+ id as "id: Id",
+ message.channel as "channel: channel::Id",
+ message.sender as "sender: login::Id",
+ message.sent_at as "sent_at: DateTime",
+ message.sent_sequence as "sent_sequence: Sequence",
+ message.body,
+ deleted.deleted_at as "deleted_at: DateTime",
+ deleted.deleted_sequence as "deleted_sequence: Sequence"
from message
- where sent_at < $1
+ left join message_deleted as deleted
+ using (id)
+ where message.sent_at < $1
+ and deleted.id is null
"#,
expire_at,
)
+ .map(|row| History {
+ message: Message {
+ sent: Instant::new(row.sent_at, row.sent_sequence),
+ id: row.id,
+ channel: row.channel,
+ sender: row.sender,
+ body: row.body,
+ deleted_at: row.deleted_at,
+ },
+ deleted: Instant::optional(row.deleted_at, row.deleted_sequence),
+ })
.fetch_all(&mut *self.0)
.await?;
@@ -226,29 +288,31 @@ impl<'c> Messages<'c> {
let messages = sqlx::query!(
r#"
select
- channel as "channel: channel::Id",
- sender as "sender: login::Id",
id as "id: Id",
- body,
- sent_at as "sent_at: DateTime",
- sent_sequence as "sent_sequence: Sequence"
+ message.channel as "channel: channel::Id",
+ message.sender as "sender: login::Id",
+ message.sent_at as "sent_at: DateTime",
+ message.sent_sequence as "sent_sequence: Sequence",
+ message.body,
+ deleted.deleted_at as "deleted_at: DateTime",
+ deleted.deleted_sequence as "deleted_sequence: Sequence"
from message
+ left join message_deleted as deleted
+ using (id)
where coalesce(message.sent_sequence > $1, true)
"#,
resume_at,
)
.map(|row| History {
message: Message {
- sent: Instant {
- at: row.sent_at,
- sequence: row.sent_sequence,
- },
+ sent: Instant::new(row.sent_at, row.sent_sequence),
channel: row.channel,
sender: row.sender,
id: row.id,
body: row.body,
+ deleted_at: row.deleted_at,
},
- deleted: None,
+ deleted: Instant::optional(row.deleted_at, row.deleted_sequence),
})
.fetch_all(&mut *self.0)
.await?;