diff options
Diffstat (limited to 'src/events/repo/message.rs')
| -rw-r--r-- | src/events/repo/message.rs | 70 |
1 files changed, 60 insertions, 10 deletions
diff --git a/src/events/repo/message.rs b/src/events/repo/message.rs index f6fce0e..32419d5 100644 --- a/src/events/repo/message.rs +++ b/src/events/repo/message.rs @@ -6,7 +6,7 @@ use crate::{ repo::{ channel::{self, Channel}, login::{self, Login}, - message, + message::{self, Message}, }, }; @@ -30,7 +30,7 @@ impl<'c> Events<'c> { body: &str, sent_at: &DateTime, ) -> Result<types::ChannelEvent, sqlx::Error> { - let sequence = self.assign_sequence(&channel.id).await?; + let sequence = self.assign_sequence(channel).await?; let id = message::Id::generate(); @@ -59,7 +59,7 @@ impl<'c> Events<'c> { channel: channel.clone(), data: types::MessageEvent { sender: sender.clone(), - message: message::Message { + message: Message { id: row.id, body: row.body, }, @@ -72,7 +72,7 @@ impl<'c> Events<'c> { Ok(message) } - async fn assign_sequence(&mut self, channel: &channel::Id) -> Result<Sequence, sqlx::Error> { + async fn assign_sequence(&mut self, channel: &Channel) -> Result<Sequence, sqlx::Error> { let next = sqlx::query_scalar!( r#" update channel @@ -80,7 +80,7 @@ impl<'c> Events<'c> { where id = $1 returning last_sequence as "next_sequence: Sequence" "#, - channel, + channel.id, ) .fetch_one(&mut *self.0) .await?; @@ -88,18 +88,68 @@ impl<'c> Events<'c> { Ok(next) } - pub async fn expire(&mut self, expire_at: &DateTime) -> Result<(), sqlx::Error> { - sqlx::query!( + pub async fn delete_expired( + &mut self, + channel: &Channel, + message: &message::Id, + deleted_at: &DateTime, + ) -> Result<types::ChannelEvent, sqlx::Error> { + let sequence = self.assign_sequence(channel).await?; + + sqlx::query_scalar!( r#" delete from message + where id = $1 + returning 1 as "row: i64" + "#, + message, + ) + .fetch_one(&mut *self.0) + .await?; + + Ok(types::ChannelEvent { + sequence, + at: *deleted_at, + channel: channel.clone(), + data: types::MessageDeletedEvent { + message: message.clone(), + } + .into(), + }) + } + + pub async fn expired( + &mut self, + expire_at: &DateTime, + ) -> Result<Vec<(Channel, message::Id)>, sqlx::Error> { + let messages = sqlx::query!( + r#" + select + channel.id as "channel_id: channel::Id", + channel.name as "channel_name", + channel.created_at as "channel_created_at: DateTime", + message.id as "message: message::Id" + from message + join channel on message.channel = channel.id + join login as sender on message.sender = sender.id where sent_at < $1 "#, expire_at, ) - .execute(&mut *self.0) + .map(|row| { + ( + Channel { + id: row.channel_id, + name: row.channel_name, + created_at: row.channel_created_at, + }, + row.message, + ) + }) + .fetch_all(&mut *self.0) .await?; - Ok(()) + Ok(messages) } pub async fn replay( @@ -134,7 +184,7 @@ impl<'c> Events<'c> { id: row.sender_id, name: row.sender_name, }, - message: message::Message { + message: Message { id: row.id, body: row.body, }, |
