From 6f07e6869bbf62903ac83c9bc061e7bde997e6a8 Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Wed, 2 Oct 2024 01:10:09 -0400 Subject: Retire top-level `repo`. This helped me discover an organizational scheme I like more. --- src/channel/app.rs | 6 +- src/channel/mod.rs | 1 + src/channel/repo.rs | 165 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 168 insertions(+), 4 deletions(-) create mode 100644 src/channel/repo.rs (limited to 'src/channel') diff --git a/src/channel/app.rs b/src/channel/app.rs index 1422651..ef0a63f 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -2,11 +2,9 @@ use chrono::TimeDelta; use sqlx::sqlite::SqlitePool; use crate::{ - channel::Channel, + channel::{repo::Provider as _, Channel}, clock::DateTime, - event::Sequence, - event::{broadcaster::Broadcaster, types::ChannelEvent}, - repo::{channel::Provider as _, sequence::Provider as _}, + event::{broadcaster::Broadcaster, repo::Provider as _, types::ChannelEvent, Sequence}, }; pub struct Channels<'a> { diff --git a/src/channel/mod.rs b/src/channel/mod.rs index 02d0ed4..2672084 100644 --- a/src/channel/mod.rs +++ b/src/channel/mod.rs @@ -2,6 +2,7 @@ use crate::{clock::DateTime, event::Sequence}; pub mod app; mod id; +pub mod repo; mod routes; pub use self::{id::Id, routes::router}; diff --git a/src/channel/repo.rs b/src/channel/repo.rs new file mode 100644 index 0000000..18cd81f --- /dev/null +++ b/src/channel/repo.rs @@ -0,0 +1,165 @@ +use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; + +use crate::{ + channel::{Channel, Id}, + clock::DateTime, + event::{types, Sequence}, +}; + +pub trait Provider { + fn channels(&mut self) -> Channels; +} + +impl<'c> Provider for Transaction<'c, Sqlite> { + fn channels(&mut self) -> Channels { + Channels(self) + } +} + +pub struct Channels<'t>(&'t mut SqliteConnection); + +impl<'c> Channels<'c> { + pub async fn create( + &mut self, + name: &str, + created_at: &DateTime, + created_sequence: Sequence, + ) -> Result { + let id = Id::generate(); + let channel = sqlx::query_as!( + Channel, + r#" + insert + into channel (id, name, created_at, created_sequence) + values ($1, $2, $3, $4) + returning + id as "id: Id", + name, + created_at as "created_at: DateTime", + created_sequence as "created_sequence: Sequence" + "#, + id, + name, + created_at, + created_sequence, + ) + .fetch_one(&mut *self.0) + .await?; + + Ok(channel) + } + + pub async fn by_id(&mut self, channel: &Id) -> Result { + let channel = sqlx::query_as!( + Channel, + r#" + select + id as "id: Id", + name, + created_at as "created_at: DateTime", + created_sequence as "created_sequence: Sequence" + from channel + where id = $1 + "#, + channel, + ) + .fetch_one(&mut *self.0) + .await?; + + Ok(channel) + } + + pub async fn all( + &mut self, + resume_point: Option, + ) -> Result, sqlx::Error> { + let channels = sqlx::query_as!( + Channel, + r#" + select + id as "id: Id", + name, + created_at as "created_at: DateTime", + created_sequence as "created_sequence: Sequence" + from channel + where coalesce(created_sequence <= $1, true) + order by channel.name + "#, + resume_point, + ) + .fetch_all(&mut *self.0) + .await?; + + Ok(channels) + } + + pub async fn replay( + &mut self, + resume_at: Option, + ) -> Result, sqlx::Error> { + let channels = sqlx::query_as!( + Channel, + r#" + select + id as "id: Id", + name, + created_at as "created_at: DateTime", + created_sequence as "created_sequence: Sequence" + from channel + where coalesce(created_sequence > $1, true) + "#, + resume_at, + ) + .fetch_all(&mut *self.0) + .await?; + + Ok(channels) + } + + pub async fn delete( + &mut self, + channel: &Channel, + deleted_at: &DateTime, + deleted_sequence: Sequence, + ) -> Result { + let channel = channel.id.clone(); + sqlx::query_scalar!( + r#" + delete from channel + where id = $1 + returning 1 as "row: i64" + "#, + channel, + ) + .fetch_one(&mut *self.0) + .await?; + + Ok(types::ChannelEvent { + sequence: deleted_sequence, + at: *deleted_at, + data: types::DeletedEvent { channel }.into(), + }) + } + + pub async fn expired(&mut self, expired_at: &DateTime) -> Result, sqlx::Error> { + let channels = sqlx::query_as!( + Channel, + r#" + select + channel.id as "id: Id", + channel.name, + channel.created_at as "created_at: DateTime", + channel.created_sequence as "created_sequence: Sequence" + from channel + left join message + where created_at < $1 + and message.id is null + "#, + expired_at, + ) + .fetch_all(&mut *self.0) + .await?; + + Ok(channels) + } +} -- cgit v1.2.3