diff options
Diffstat (limited to 'src/repo')
| -rw-r--r-- | src/repo/channel.rs | 165 | ||||
| -rw-r--r-- | src/repo/error.rs | 23 | ||||
| -rw-r--r-- | src/repo/mod.rs | 4 | ||||
| -rw-r--r-- | src/repo/pool.rs | 18 | ||||
| -rw-r--r-- | src/repo/sequence.rs | 44 |
5 files changed, 0 insertions, 254 deletions
diff --git a/src/repo/channel.rs b/src/repo/channel.rs deleted file mode 100644 index 18cd81f..0000000 --- a/src/repo/channel.rs +++ /dev/null @@ -1,165 +0,0 @@ -use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; - -use crate::{ - channel::{Channel, Id}, - clock::DateTime, - event::{types, Sequence}, -}; - -pub trait Provider { - fn channels(&mut self) -> Channels; -} - -impl<'c> Provider for Transaction<'c, Sqlite> { - fn channels(&mut self) -> Channels { - Channels(self) - } -} - -pub struct Channels<'t>(&'t mut SqliteConnection); - -impl<'c> Channels<'c> { - pub async fn create( - &mut self, - name: &str, - created_at: &DateTime, - created_sequence: Sequence, - ) -> Result<Channel, sqlx::Error> { - let id = Id::generate(); - let channel = sqlx::query_as!( - Channel, - r#" - insert - into channel (id, name, created_at, created_sequence) - values ($1, $2, $3, $4) - returning - id as "id: Id", - name, - created_at as "created_at: DateTime", - created_sequence as "created_sequence: Sequence" - "#, - id, - name, - created_at, - created_sequence, - ) - .fetch_one(&mut *self.0) - .await?; - - Ok(channel) - } - - pub async fn by_id(&mut self, channel: &Id) -> Result<Channel, sqlx::Error> { - let channel = sqlx::query_as!( - Channel, - r#" - select - id as "id: Id", - name, - created_at as "created_at: DateTime", - created_sequence as "created_sequence: Sequence" - from channel - where id = $1 - "#, - channel, - ) - .fetch_one(&mut *self.0) - .await?; - - Ok(channel) - } - - pub async fn all( - &mut self, - resume_point: Option<Sequence>, - ) -> Result<Vec<Channel>, sqlx::Error> { - let channels = sqlx::query_as!( - Channel, - r#" - select - id as "id: Id", - name, - created_at as "created_at: DateTime", - created_sequence as "created_sequence: Sequence" - from channel - where coalesce(created_sequence <= $1, true) - order by channel.name - "#, - resume_point, - ) - .fetch_all(&mut *self.0) - .await?; - - Ok(channels) - } - - pub async fn replay( - &mut self, - resume_at: Option<Sequence>, - ) -> Result<Vec<Channel>, sqlx::Error> { - let channels = sqlx::query_as!( - Channel, - r#" - select - id as "id: Id", - name, - created_at as "created_at: DateTime", - created_sequence as "created_sequence: Sequence" - from channel - where coalesce(created_sequence > $1, true) - "#, - resume_at, - ) - .fetch_all(&mut *self.0) - .await?; - - Ok(channels) - } - - pub async fn delete( - &mut self, - channel: &Channel, - deleted_at: &DateTime, - deleted_sequence: Sequence, - ) -> Result<types::ChannelEvent, sqlx::Error> { - let channel = channel.id.clone(); - sqlx::query_scalar!( - r#" - delete from channel - where id = $1 - returning 1 as "row: i64" - "#, - channel, - ) - .fetch_one(&mut *self.0) - .await?; - - Ok(types::ChannelEvent { - sequence: deleted_sequence, - at: *deleted_at, - data: types::DeletedEvent { channel }.into(), - }) - } - - pub async fn expired(&mut self, expired_at: &DateTime) -> Result<Vec<Channel>, sqlx::Error> { - let channels = sqlx::query_as!( - Channel, - r#" - select - channel.id as "id: Id", - channel.name, - channel.created_at as "created_at: DateTime", - channel.created_sequence as "created_sequence: Sequence" - from channel - left join message - where created_at < $1 - and message.id is null - "#, - expired_at, - ) - .fetch_all(&mut *self.0) - .await?; - - Ok(channels) - } -} diff --git a/src/repo/error.rs b/src/repo/error.rs deleted file mode 100644 index a5961e2..0000000 --- a/src/repo/error.rs +++ /dev/null @@ -1,23 +0,0 @@ -pub trait NotFound { - type Ok; - fn not_found<E, F>(self, map: F) -> Result<Self::Ok, E> - where - E: From<sqlx::Error>, - F: FnOnce() -> E; -} - -impl<T> NotFound for Result<T, sqlx::Error> { - type Ok = T; - - fn not_found<E, F>(self, map: F) -> Result<T, E> - where - E: From<sqlx::Error>, - F: FnOnce() -> E, - { - match self { - Err(sqlx::Error::RowNotFound) => Err(map()), - Err(other) => Err(other.into()), - Ok(value) => Ok(value), - } - } -} diff --git a/src/repo/mod.rs b/src/repo/mod.rs deleted file mode 100644 index 7abd46b..0000000 --- a/src/repo/mod.rs +++ /dev/null @@ -1,4 +0,0 @@ -pub mod channel; -pub mod error; -pub mod pool; -pub mod sequence; diff --git a/src/repo/pool.rs b/src/repo/pool.rs deleted file mode 100644 index b4aa6fc..0000000 --- a/src/repo/pool.rs +++ /dev/null @@ -1,18 +0,0 @@ -use std::str::FromStr; - -use sqlx::sqlite::{SqliteConnectOptions, SqlitePool, SqlitePoolOptions}; - -pub async fn prepare(url: &str) -> sqlx::Result<SqlitePool> { - let pool = create(url).await?; - sqlx::migrate!().run(&pool).await?; - Ok(pool) -} - -async fn create(database_url: &str) -> sqlx::Result<SqlitePool> { - let options = SqliteConnectOptions::from_str(database_url)? - .create_if_missing(true) - .optimize_on_close(true, /* analysis_limit */ None); - - let pool = SqlitePoolOptions::new().connect_with(options).await?; - Ok(pool) -} diff --git a/src/repo/sequence.rs b/src/repo/sequence.rs deleted file mode 100644 index c985869..0000000 --- a/src/repo/sequence.rs +++ /dev/null @@ -1,44 +0,0 @@ -use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; - -use crate::event::Sequence; - -pub trait Provider { - fn sequence(&mut self) -> Sequences; -} - -impl<'c> Provider for Transaction<'c, Sqlite> { - fn sequence(&mut self) -> Sequences { - Sequences(self) - } -} - -pub struct Sequences<'t>(&'t mut SqliteConnection); - -impl<'c> Sequences<'c> { - pub async fn next(&mut self) -> Result<Sequence, sqlx::Error> { - let next = sqlx::query_scalar!( - r#" - update event_sequence - set last_value = last_value + 1 - returning last_value as "next_value: Sequence" - "#, - ) - .fetch_one(&mut *self.0) - .await?; - - Ok(next) - } - - pub async fn current(&mut self) -> Result<Sequence, sqlx::Error> { - let next = sqlx::query_scalar!( - r#" - select last_value as "last_value: Sequence" - from event_sequence - "#, - ) - .fetch_one(&mut *self.0) - .await?; - - Ok(next) - } -} |
