diff options
Diffstat (limited to 'src/message/repo.rs')
| -rw-r--r-- | src/message/repo.rs | 204 |
1 files changed, 134 insertions, 70 deletions
diff --git a/src/message/repo.rs b/src/message/repo.rs index 71c6d10..14ff7bf 100644 --- a/src/message/repo.rs +++ b/src/message/repo.rs @@ -53,14 +53,12 @@ impl<'c> Messages<'c> { ) .map(|row| History { message: Message { - sent: Instant { - at: row.sent_at, - sequence: row.sent_sequence, - }, + sent: Instant::new(row.sent_at, row.sent_sequence), channel: row.channel, sender: row.sender, id: row.id, body: row.body, + deleted_at: None, }, deleted: None, }) @@ -70,41 +68,37 @@ impl<'c> Messages<'c> { Ok(message) } - pub async fn in_channel( - &mut self, - channel: &channel::History, - resume_at: ResumePoint, - ) -> Result<Vec<History>, sqlx::Error> { + pub async fn live(&mut self, channel: &channel::History) -> Result<Vec<History>, sqlx::Error> { let channel_id = channel.id(); let messages = sqlx::query!( r#" select - channel as "channel: channel::Id", - sender as "sender: login::Id", + message.channel as "channel: channel::Id", + message.sender as "sender: login::Id", id as "id: Id", - body, - sent_at as "sent_at: DateTime", - sent_sequence as "sent_sequence: Sequence" + message.body, + message.sent_at as "sent_at: DateTime", + message.sent_sequence as "sent_sequence: Sequence", + deleted.deleted_at as "deleted_at: DateTime", + deleted.deleted_sequence as "deleted_sequence: Sequence" from message - where channel = $1 - and coalesce(sent_sequence <= $2, true) - order by sent_sequence + left join message_deleted as deleted + using (id) + where message.channel = $1 + and deleted.id is null "#, channel_id, - resume_at, ) .map(|row| History { message: Message { - sent: Instant { - at: row.sent_at, - sequence: row.sent_sequence, - }, + sent: Instant::new(row.sent_at, row.sent_sequence), channel: row.channel, sender: row.sender, id: row.id, body: row.body, + deleted_at: row.deleted_at, }, - deleted: None, + deleted: Instant::optional(row.deleted_at, row.deleted_sequence), }) .fetch_all(&mut *self.0) .await?; @@ -116,30 +110,32 @@ impl<'c> Messages<'c> { let messages = sqlx::query!( r#" select - channel as "channel: channel::Id", - sender as "sender: login::Id", + message.channel as "channel: channel::Id", + message.sender as "sender: login::Id", id as "id: Id", - body, - sent_at as "sent_at: DateTime", - sent_sequence as "sent_sequence: Sequence" + message.body, + message.sent_at as "sent_at: DateTime", + message.sent_sequence as "sent_sequence: Sequence", + deleted.deleted_at as "deleted_at: DateTime", + deleted.deleted_sequence as "deleted_sequence: Sequence" from message - where coalesce(sent_sequence <= $2, true) - order by sent_sequence + left join message_deleted as deleted + using (id) + where coalesce(message.sent_sequence <= $2, true) + order by message.sent_sequence "#, resume_at, ) .map(|row| History { message: Message { - sent: Instant { - at: row.sent_at, - sequence: row.sent_sequence, - }, + sent: Instant::new(row.sent_at, row.sent_sequence), channel: row.channel, sender: row.sender, id: row.id, body: row.body, + deleted_at: row.deleted_at, }, - deleted: None, + deleted: Instant::optional(row.deleted_at, row.deleted_sequence), }) .fetch_all(&mut *self.0) .await?; @@ -147,33 +143,35 @@ impl<'c> Messages<'c> { Ok(messages) } - async fn by_id(&mut self, message: &Id) -> Result<History, sqlx::Error> { + pub async fn by_id(&mut self, message: &Id) -> Result<History, sqlx::Error> { let message = sqlx::query!( r#" select - channel as "channel: channel::Id", - sender as "sender: login::Id", + message.channel as "channel: channel::Id", + message.sender as "sender: login::Id", id as "id: Id", - body, - sent_at as "sent_at: DateTime", - sent_sequence as "sent_sequence: Sequence" + message.body, + message.sent_at as "sent_at: DateTime", + message.sent_sequence as "sent_sequence: Sequence", + deleted.deleted_at as "deleted_at: DateTime", + deleted.deleted_sequence as "deleted_sequence: Sequence" from message + left join message_deleted as deleted + using (id) where id = $1 "#, message, ) .map(|row| History { message: Message { - sent: Instant { - at: row.sent_at, - sequence: row.sent_sequence, - }, + sent: Instant::new(row.sent_at, row.sent_sequence), channel: row.channel, sender: row.sender, id: row.id, body: row.body, + deleted_at: row.deleted_at, }, - deleted: None, + deleted: Instant::optional(row.deleted_at, row.deleted_sequence), }) .fetch_one(&mut *self.0) .await?; @@ -183,39 +181,103 @@ impl<'c> Messages<'c> { pub async fn delete( &mut self, - message: &Id, + message: &History, deleted: &Instant, ) -> Result<History, sqlx::Error> { - let history = self.by_id(message).await?; + let id = message.id(); sqlx::query_scalar!( r#" - delete from message - where - id = $1 - returning 1 as "deleted: i64" + insert into message_deleted (id, deleted_at, deleted_sequence) + values ($1, $2, $3) + returning 1 as "deleted: bool" "#, - history.message.id, + id, + deleted.at, + deleted.sequence, ) .fetch_one(&mut *self.0) .await?; - Ok(History { - deleted: Some(*deleted), - ..history - }) + // Small social responsibility hack here: when a message is deleted, its body is + // retconned to have been the empty string. Someone reading the event stream + // afterwards, or looking at messages in the channel, cannot retrieve the + // "deleted" message by ignoring the deletion event. + sqlx::query_scalar!( + r#" + update message + set body = "" + where id = $1 + returning 1 as "blanked: bool" + "#, + id, + ) + .fetch_one(&mut *self.0) + .await?; + + let message = self.by_id(id).await?; + + Ok(message) } - pub async fn expired(&mut self, expire_at: &DateTime) -> Result<Vec<Id>, sqlx::Error> { + pub async fn purge(&mut self, purge_at: &DateTime) -> Result<(), sqlx::Error> { let messages = sqlx::query_scalar!( r#" + delete from message_deleted + where deleted_at < $1 + returning id as "id: Id" + "#, + purge_at, + ) + .fetch_all(&mut *self.0) + .await?; + + for message in messages { + sqlx::query!( + r#" + delete from message + where id = $1 + "#, + message, + ) + .execute(&mut *self.0) + .await?; + } + + Ok(()) + } + + pub async fn expired(&mut self, expire_at: &DateTime) -> Result<Vec<History>, sqlx::Error> { + let messages = sqlx::query!( + r#" select - id as "message: Id" + id as "id: Id", + message.channel as "channel: channel::Id", + message.sender as "sender: login::Id", + message.sent_at as "sent_at: DateTime", + message.sent_sequence as "sent_sequence: Sequence", + message.body, + deleted.deleted_at as "deleted_at: DateTime", + deleted.deleted_sequence as "deleted_sequence: Sequence" from message - where sent_at < $1 + left join message_deleted as deleted + using (id) + where message.sent_at < $1 + and deleted.id is null "#, expire_at, ) + .map(|row| History { + message: Message { + sent: Instant::new(row.sent_at, row.sent_sequence), + id: row.id, + channel: row.channel, + sender: row.sender, + body: row.body, + deleted_at: row.deleted_at, + }, + deleted: Instant::optional(row.deleted_at, row.deleted_sequence), + }) .fetch_all(&mut *self.0) .await?; @@ -226,29 +288,31 @@ impl<'c> Messages<'c> { let messages = sqlx::query!( r#" select - channel as "channel: channel::Id", - sender as "sender: login::Id", id as "id: Id", - body, - sent_at as "sent_at: DateTime", - sent_sequence as "sent_sequence: Sequence" + message.channel as "channel: channel::Id", + message.sender as "sender: login::Id", + message.sent_at as "sent_at: DateTime", + message.sent_sequence as "sent_sequence: Sequence", + message.body, + deleted.deleted_at as "deleted_at: DateTime", + deleted.deleted_sequence as "deleted_sequence: Sequence" from message + left join message_deleted as deleted + using (id) where coalesce(message.sent_sequence > $1, true) "#, resume_at, ) .map(|row| History { message: Message { - sent: Instant { - at: row.sent_at, - sequence: row.sent_sequence, - }, + sent: Instant::new(row.sent_at, row.sent_sequence), channel: row.channel, sender: row.sender, id: row.id, body: row.body, + deleted_at: row.deleted_at, }, - deleted: None, + deleted: Instant::optional(row.deleted_at, row.deleted_sequence), }) .fetch_all(&mut *self.0) .await?; |
