From 357116366c1307bedaac6a3dfe9c5ed8e0e0c210 Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Wed, 2 Oct 2024 00:41:25 -0400 Subject: First pass on reorganizing the backend. This is primarily renames and repackagings. --- src/event/repo/message.rs | 188 ++++++++++++++++++++++++++++++++++++++++++++++ src/event/repo/mod.rs | 1 + 2 files changed, 189 insertions(+) create mode 100644 src/event/repo/message.rs create mode 100644 src/event/repo/mod.rs (limited to 'src/event/repo') diff --git a/src/event/repo/message.rs b/src/event/repo/message.rs new file mode 100644 index 0000000..f051fec --- /dev/null +++ b/src/event/repo/message.rs @@ -0,0 +1,188 @@ +use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; + +use crate::{ + channel::{self, Channel}, + clock::DateTime, + event::{types, Sequence}, + login::{self, Login}, + message::{self, Message}, +}; + +pub trait Provider { + fn message_events(&mut self) -> Events; +} + +impl<'c> Provider for Transaction<'c, Sqlite> { + fn message_events(&mut self) -> Events { + Events(self) + } +} + +pub struct Events<'t>(&'t mut SqliteConnection); + +impl<'c> Events<'c> { + pub async fn create( + &mut self, + sender: &Login, + channel: &Channel, + sent_at: &DateTime, + sent_sequence: Sequence, + body: &str, + ) -> Result { + let id = message::Id::generate(); + + let message = sqlx::query!( + r#" + insert into message + (id, channel, sender, sent_at, sent_sequence, body) + values ($1, $2, $3, $4, $5, $6) + returning + id as "id: message::Id", + sender as "sender: login::Id", + sent_at as "sent_at: DateTime", + sent_sequence as "sent_sequence: Sequence", + body + "#, + id, + channel.id, + sender.id, + sent_at, + sent_sequence, + body, + ) + .map(|row| types::ChannelEvent { + sequence: row.sent_sequence, + at: row.sent_at, + data: types::MessageEvent { + channel: channel.clone(), + sender: sender.clone(), + message: Message { + id: row.id, + body: row.body, + }, + } + .into(), + }) + .fetch_one(&mut *self.0) + .await?; + + Ok(message) + } + + pub async fn delete( + &mut self, + channel: &Channel, + message: &message::Id, + deleted_at: &DateTime, + deleted_sequence: Sequence, + ) -> Result { + sqlx::query_scalar!( + r#" + delete from message + where id = $1 + returning 1 as "row: i64" + "#, + message, + ) + .fetch_one(&mut *self.0) + .await?; + + Ok(types::ChannelEvent { + sequence: deleted_sequence, + at: *deleted_at, + data: types::MessageDeletedEvent { + channel: channel.clone(), + message: message.clone(), + } + .into(), + }) + } + + pub async fn expired( + &mut self, + expire_at: &DateTime, + ) -> Result, sqlx::Error> { + let messages = sqlx::query!( + r#" + select + channel.id as "channel_id: channel::Id", + channel.name as "channel_name", + channel.created_at as "channel_created_at: DateTime", + channel.created_sequence as "channel_created_sequence: Sequence", + message.id as "message: message::Id" + from message + join channel on message.channel = channel.id + join login as sender on message.sender = sender.id + where sent_at < $1 + "#, + expire_at, + ) + .map(|row| { + ( + Channel { + id: row.channel_id, + name: row.channel_name, + created_at: row.channel_created_at, + created_sequence: row.channel_created_sequence, + }, + row.message, + ) + }) + .fetch_all(&mut *self.0) + .await?; + + Ok(messages) + } + + pub async fn replay( + &mut self, + resume_at: Option, + ) -> Result, sqlx::Error> { + let events = sqlx::query!( + r#" + select + message.id as "id: message::Id", + channel.id as "channel_id: channel::Id", + channel.name as "channel_name", + channel.created_at as "channel_created_at: DateTime", + channel.created_sequence as "channel_created_sequence: Sequence", + sender.id as "sender_id: login::Id", + sender.name as sender_name, + message.sent_at as "sent_at: DateTime", + message.sent_sequence as "sent_sequence: Sequence", + message.body + from message + join channel on message.channel = channel.id + join login as sender on message.sender = sender.id + where coalesce(message.sent_sequence > $1, true) + order by sent_sequence asc + "#, + resume_at, + ) + .map(|row| types::ChannelEvent { + sequence: row.sent_sequence, + at: row.sent_at, + data: types::MessageEvent { + channel: Channel { + id: row.channel_id, + name: row.channel_name, + created_at: row.channel_created_at, + created_sequence: row.channel_created_sequence, + }, + sender: Login { + id: row.sender_id, + name: row.sender_name, + }, + message: Message { + id: row.id, + body: row.body, + }, + } + .into(), + }) + .fetch_all(&mut *self.0) + .await?; + + Ok(events) + } +} diff --git a/src/event/repo/mod.rs b/src/event/repo/mod.rs new file mode 100644 index 0000000..e216a50 --- /dev/null +++ b/src/event/repo/mod.rs @@ -0,0 +1 @@ +pub mod message; -- cgit v1.2.3