summaryrefslogtreecommitdiff
path: root/src/events/repo/message.rs
diff options
context:
space:
mode:
authorOwen Jacobson <owen@grimoire.ca>2024-09-27 23:03:46 -0400
committerOwen Jacobson <owen@grimoire.ca>2024-09-28 01:00:12 -0400
commit60b711c844f8624348d5d1dac3a625532a8e2a82 (patch)
treea667cfa3833046425a87ec03c700d6124af70e4e /src/events/repo/message.rs
parent08c3a6e77a3f61ffc9643a5e1f840df9078d0b36 (diff)
Delete expired messages out of band.
Trying to reliably do expiry mid-request was causing some anomalies: * Creating a channel with a dup name would fail, then succeed after listing channels. It was very hard to reason about which operations needed to trigger expiry, to fix this "correctly," so now expiry runs on every request.
Diffstat (limited to 'src/events/repo/message.rs')
-rw-r--r--src/events/repo/message.rs70
1 files changed, 60 insertions, 10 deletions
diff --git a/src/events/repo/message.rs b/src/events/repo/message.rs
index f6fce0e..32419d5 100644
--- a/src/events/repo/message.rs
+++ b/src/events/repo/message.rs
@@ -6,7 +6,7 @@ use crate::{
repo::{
channel::{self, Channel},
login::{self, Login},
- message,
+ message::{self, Message},
},
};
@@ -30,7 +30,7 @@ impl<'c> Events<'c> {
body: &str,
sent_at: &DateTime,
) -> Result<types::ChannelEvent, sqlx::Error> {
- let sequence = self.assign_sequence(&channel.id).await?;
+ let sequence = self.assign_sequence(channel).await?;
let id = message::Id::generate();
@@ -59,7 +59,7 @@ impl<'c> Events<'c> {
channel: channel.clone(),
data: types::MessageEvent {
sender: sender.clone(),
- message: message::Message {
+ message: Message {
id: row.id,
body: row.body,
},
@@ -72,7 +72,7 @@ impl<'c> Events<'c> {
Ok(message)
}
- async fn assign_sequence(&mut self, channel: &channel::Id) -> Result<Sequence, sqlx::Error> {
+ async fn assign_sequence(&mut self, channel: &Channel) -> Result<Sequence, sqlx::Error> {
let next = sqlx::query_scalar!(
r#"
update channel
@@ -80,7 +80,7 @@ impl<'c> Events<'c> {
where id = $1
returning last_sequence as "next_sequence: Sequence"
"#,
- channel,
+ channel.id,
)
.fetch_one(&mut *self.0)
.await?;
@@ -88,18 +88,68 @@ impl<'c> Events<'c> {
Ok(next)
}
- pub async fn expire(&mut self, expire_at: &DateTime) -> Result<(), sqlx::Error> {
- sqlx::query!(
+ pub async fn delete_expired(
+ &mut self,
+ channel: &Channel,
+ message: &message::Id,
+ deleted_at: &DateTime,
+ ) -> Result<types::ChannelEvent, sqlx::Error> {
+ let sequence = self.assign_sequence(channel).await?;
+
+ sqlx::query_scalar!(
r#"
delete from message
+ where id = $1
+ returning 1 as "row: i64"
+ "#,
+ message,
+ )
+ .fetch_one(&mut *self.0)
+ .await?;
+
+ Ok(types::ChannelEvent {
+ sequence,
+ at: *deleted_at,
+ channel: channel.clone(),
+ data: types::MessageDeletedEvent {
+ message: message.clone(),
+ }
+ .into(),
+ })
+ }
+
+ pub async fn expired(
+ &mut self,
+ expire_at: &DateTime,
+ ) -> Result<Vec<(Channel, message::Id)>, sqlx::Error> {
+ let messages = sqlx::query!(
+ r#"
+ select
+ channel.id as "channel_id: channel::Id",
+ channel.name as "channel_name",
+ channel.created_at as "channel_created_at: DateTime",
+ message.id as "message: message::Id"
+ from message
+ join channel on message.channel = channel.id
+ join login as sender on message.sender = sender.id
where sent_at < $1
"#,
expire_at,
)
- .execute(&mut *self.0)
+ .map(|row| {
+ (
+ Channel {
+ id: row.channel_id,
+ name: row.channel_name,
+ created_at: row.channel_created_at,
+ },
+ row.message,
+ )
+ })
+ .fetch_all(&mut *self.0)
.await?;
- Ok(())
+ Ok(messages)
}
pub async fn replay(
@@ -134,7 +184,7 @@ impl<'c> Events<'c> {
id: row.sender_id,
name: row.sender_name,
},
- message: message::Message {
+ message: Message {
id: row.id,
body: row.body,
},