diff options
Diffstat (limited to 'src/channel')
| -rw-r--r-- | src/channel/app.rs | 6 | ||||
| -rw-r--r-- | src/channel/mod.rs | 1 | ||||
| -rw-r--r-- | src/channel/repo.rs | 165 |
3 files changed, 168 insertions, 4 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs index 1422651..ef0a63f 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -2,11 +2,9 @@ use chrono::TimeDelta; use sqlx::sqlite::SqlitePool; use crate::{ - channel::Channel, + channel::{repo::Provider as _, Channel}, clock::DateTime, - event::Sequence, - event::{broadcaster::Broadcaster, types::ChannelEvent}, - repo::{channel::Provider as _, sequence::Provider as _}, + event::{broadcaster::Broadcaster, repo::Provider as _, types::ChannelEvent, Sequence}, }; pub struct Channels<'a> { diff --git a/src/channel/mod.rs b/src/channel/mod.rs index 02d0ed4..2672084 100644 --- a/src/channel/mod.rs +++ b/src/channel/mod.rs @@ -2,6 +2,7 @@ use crate::{clock::DateTime, event::Sequence}; pub mod app; mod id; +pub mod repo; mod routes; pub use self::{id::Id, routes::router}; diff --git a/src/channel/repo.rs b/src/channel/repo.rs new file mode 100644 index 0000000..18cd81f --- /dev/null +++ b/src/channel/repo.rs @@ -0,0 +1,165 @@ +use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; + +use crate::{ + channel::{Channel, Id}, + clock::DateTime, + event::{types, Sequence}, +}; + +pub trait Provider { + fn channels(&mut self) -> Channels; +} + +impl<'c> Provider for Transaction<'c, Sqlite> { + fn channels(&mut self) -> Channels { + Channels(self) + } +} + +pub struct Channels<'t>(&'t mut SqliteConnection); + +impl<'c> Channels<'c> { + pub async fn create( + &mut self, + name: &str, + created_at: &DateTime, + created_sequence: Sequence, + ) -> Result<Channel, sqlx::Error> { + let id = Id::generate(); + let channel = sqlx::query_as!( + Channel, + r#" + insert + into channel (id, name, created_at, created_sequence) + values ($1, $2, $3, $4) + returning + id as "id: Id", + name, + created_at as "created_at: DateTime", + created_sequence as "created_sequence: Sequence" + "#, + id, + name, + created_at, + created_sequence, + ) + .fetch_one(&mut *self.0) + .await?; + + Ok(channel) + } + + pub async fn by_id(&mut self, channel: &Id) -> Result<Channel, sqlx::Error> { + let channel = sqlx::query_as!( + Channel, + r#" + select + id as "id: Id", + name, + created_at as "created_at: DateTime", + created_sequence as "created_sequence: Sequence" + from channel + where id = $1 + "#, + channel, + ) + .fetch_one(&mut *self.0) + .await?; + + Ok(channel) + } + + pub async fn all( + &mut self, + resume_point: Option<Sequence>, + ) -> Result<Vec<Channel>, sqlx::Error> { + let channels = sqlx::query_as!( + Channel, + r#" + select + id as "id: Id", + name, + created_at as "created_at: DateTime", + created_sequence as "created_sequence: Sequence" + from channel + where coalesce(created_sequence <= $1, true) + order by channel.name + "#, + resume_point, + ) + .fetch_all(&mut *self.0) + .await?; + + Ok(channels) + } + + pub async fn replay( + &mut self, + resume_at: Option<Sequence>, + ) -> Result<Vec<Channel>, sqlx::Error> { + let channels = sqlx::query_as!( + Channel, + r#" + select + id as "id: Id", + name, + created_at as "created_at: DateTime", + created_sequence as "created_sequence: Sequence" + from channel + where coalesce(created_sequence > $1, true) + "#, + resume_at, + ) + .fetch_all(&mut *self.0) + .await?; + + Ok(channels) + } + + pub async fn delete( + &mut self, + channel: &Channel, + deleted_at: &DateTime, + deleted_sequence: Sequence, + ) -> Result<types::ChannelEvent, sqlx::Error> { + let channel = channel.id.clone(); + sqlx::query_scalar!( + r#" + delete from channel + where id = $1 + returning 1 as "row: i64" + "#, + channel, + ) + .fetch_one(&mut *self.0) + .await?; + + Ok(types::ChannelEvent { + sequence: deleted_sequence, + at: *deleted_at, + data: types::DeletedEvent { channel }.into(), + }) + } + + pub async fn expired(&mut self, expired_at: &DateTime) -> Result<Vec<Channel>, sqlx::Error> { + let channels = sqlx::query_as!( + Channel, + r#" + select + channel.id as "id: Id", + channel.name, + channel.created_at as "created_at: DateTime", + channel.created_sequence as "created_sequence: Sequence" + from channel + left join message + where created_at < $1 + and message.id is null + "#, + expired_at, + ) + .fetch_all(&mut *self.0) + .await?; + + Ok(channels) + } +} |
