summaryrefslogtreecommitdiff
path: root/src/events/repo/message.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/events/repo/message.rs')
-rw-r--r--src/events/repo/message.rs70
1 files changed, 60 insertions, 10 deletions
diff --git a/src/events/repo/message.rs b/src/events/repo/message.rs
index f6fce0e..32419d5 100644
--- a/src/events/repo/message.rs
+++ b/src/events/repo/message.rs
@@ -6,7 +6,7 @@ use crate::{
repo::{
channel::{self, Channel},
login::{self, Login},
- message,
+ message::{self, Message},
},
};
@@ -30,7 +30,7 @@ impl<'c> Events<'c> {
body: &str,
sent_at: &DateTime,
) -> Result<types::ChannelEvent, sqlx::Error> {
- let sequence = self.assign_sequence(&channel.id).await?;
+ let sequence = self.assign_sequence(channel).await?;
let id = message::Id::generate();
@@ -59,7 +59,7 @@ impl<'c> Events<'c> {
channel: channel.clone(),
data: types::MessageEvent {
sender: sender.clone(),
- message: message::Message {
+ message: Message {
id: row.id,
body: row.body,
},
@@ -72,7 +72,7 @@ impl<'c> Events<'c> {
Ok(message)
}
- async fn assign_sequence(&mut self, channel: &channel::Id) -> Result<Sequence, sqlx::Error> {
+ async fn assign_sequence(&mut self, channel: &Channel) -> Result<Sequence, sqlx::Error> {
let next = sqlx::query_scalar!(
r#"
update channel
@@ -80,7 +80,7 @@ impl<'c> Events<'c> {
where id = $1
returning last_sequence as "next_sequence: Sequence"
"#,
- channel,
+ channel.id,
)
.fetch_one(&mut *self.0)
.await?;
@@ -88,18 +88,68 @@ impl<'c> Events<'c> {
Ok(next)
}
- pub async fn expire(&mut self, expire_at: &DateTime) -> Result<(), sqlx::Error> {
- sqlx::query!(
+ pub async fn delete_expired(
+ &mut self,
+ channel: &Channel,
+ message: &message::Id,
+ deleted_at: &DateTime,
+ ) -> Result<types::ChannelEvent, sqlx::Error> {
+ let sequence = self.assign_sequence(channel).await?;
+
+ sqlx::query_scalar!(
r#"
delete from message
+ where id = $1
+ returning 1 as "row: i64"
+ "#,
+ message,
+ )
+ .fetch_one(&mut *self.0)
+ .await?;
+
+ Ok(types::ChannelEvent {
+ sequence,
+ at: *deleted_at,
+ channel: channel.clone(),
+ data: types::MessageDeletedEvent {
+ message: message.clone(),
+ }
+ .into(),
+ })
+ }
+
+ pub async fn expired(
+ &mut self,
+ expire_at: &DateTime,
+ ) -> Result<Vec<(Channel, message::Id)>, sqlx::Error> {
+ let messages = sqlx::query!(
+ r#"
+ select
+ channel.id as "channel_id: channel::Id",
+ channel.name as "channel_name",
+ channel.created_at as "channel_created_at: DateTime",
+ message.id as "message: message::Id"
+ from message
+ join channel on message.channel = channel.id
+ join login as sender on message.sender = sender.id
where sent_at < $1
"#,
expire_at,
)
- .execute(&mut *self.0)
+ .map(|row| {
+ (
+ Channel {
+ id: row.channel_id,
+ name: row.channel_name,
+ created_at: row.channel_created_at,
+ },
+ row.message,
+ )
+ })
+ .fetch_all(&mut *self.0)
.await?;
- Ok(())
+ Ok(messages)
}
pub async fn replay(
@@ -134,7 +184,7 @@ impl<'c> Events<'c> {
id: row.sender_id,
name: row.sender_name,
},
- message: message::Message {
+ message: Message {
id: row.id,
body: row.body,
},