summaryrefslogtreecommitdiff
path: root/src/channel/repo.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/channel/repo.rs')
-rw-r--r--src/channel/repo.rs202
1 files changed, 202 insertions, 0 deletions
diff --git a/src/channel/repo.rs b/src/channel/repo.rs
new file mode 100644
index 0000000..2b48436
--- /dev/null
+++ b/src/channel/repo.rs
@@ -0,0 +1,202 @@
+use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction};
+
+use crate::{
+ channel::{Channel, History, Id},
+ clock::DateTime,
+ event::{Instant, Sequence},
+};
+
+pub trait Provider {
+ fn channels(&mut self) -> Channels;
+}
+
+impl<'c> Provider for Transaction<'c, Sqlite> {
+ fn channels(&mut self) -> Channels {
+ Channels(self)
+ }
+}
+
+pub struct Channels<'t>(&'t mut SqliteConnection);
+
+impl<'c> Channels<'c> {
+ pub async fn create(&mut self, name: &str, created: &Instant) -> Result<History, sqlx::Error> {
+ let id = Id::generate();
+ let channel = sqlx::query!(
+ r#"
+ insert
+ into channel (id, name, created_at, created_sequence)
+ values ($1, $2, $3, $4)
+ returning
+ id as "id: Id",
+ name,
+ created_at as "created_at: DateTime",
+ created_sequence as "created_sequence: Sequence"
+ "#,
+ id,
+ name,
+ created.at,
+ created.sequence,
+ )
+ .map(|row| History {
+ channel: Channel {
+ id: row.id,
+ name: row.name,
+ },
+ created: Instant {
+ at: row.created_at,
+ sequence: row.created_sequence,
+ },
+ deleted: None,
+ })
+ .fetch_one(&mut *self.0)
+ .await?;
+
+ Ok(channel)
+ }
+
+ pub async fn by_id(&mut self, channel: &Id) -> Result<History, sqlx::Error> {
+ let channel = sqlx::query!(
+ r#"
+ select
+ id as "id: Id",
+ name,
+ created_at as "created_at: DateTime",
+ created_sequence as "created_sequence: Sequence"
+ from channel
+ where id = $1
+ "#,
+ channel,
+ )
+ .map(|row| History {
+ channel: Channel {
+ id: row.id,
+ name: row.name,
+ },
+ created: Instant {
+ at: row.created_at,
+ sequence: row.created_sequence,
+ },
+ deleted: None,
+ })
+ .fetch_one(&mut *self.0)
+ .await?;
+
+ Ok(channel)
+ }
+
+ pub async fn all(&mut self, resume_at: Option<Sequence>) -> Result<Vec<History>, sqlx::Error> {
+ let channels = sqlx::query!(
+ r#"
+ select
+ id as "id: Id",
+ name,
+ created_at as "created_at: DateTime",
+ created_sequence as "created_sequence: Sequence"
+ from channel
+ where coalesce(created_sequence <= $1, true)
+ order by channel.name
+ "#,
+ resume_at,
+ )
+ .map(|row| History {
+ channel: Channel {
+ id: row.id,
+ name: row.name,
+ },
+ created: Instant {
+ at: row.created_at,
+ sequence: row.created_sequence,
+ },
+ deleted: None,
+ })
+ .fetch_all(&mut *self.0)
+ .await?;
+
+ Ok(channels)
+ }
+
+ pub async fn replay(
+ &mut self,
+ resume_at: Option<Sequence>,
+ ) -> Result<Vec<History>, sqlx::Error> {
+ let channels = sqlx::query!(
+ r#"
+ select
+ id as "id: Id",
+ name,
+ created_at as "created_at: DateTime",
+ created_sequence as "created_sequence: Sequence"
+ from channel
+ where coalesce(created_sequence > $1, true)
+ "#,
+ resume_at,
+ )
+ .map(|row| History {
+ channel: Channel {
+ id: row.id,
+ name: row.name,
+ },
+ created: Instant {
+ at: row.created_at,
+ sequence: row.created_sequence,
+ },
+ deleted: None,
+ })
+ .fetch_all(&mut *self.0)
+ .await?;
+
+ Ok(channels)
+ }
+
+ pub async fn delete(
+ &mut self,
+ channel: &Id,
+ deleted: &Instant,
+ ) -> Result<History, sqlx::Error> {
+ let channel = sqlx::query!(
+ r#"
+ delete from channel
+ where id = $1
+ returning
+ id as "id: Id",
+ name,
+ created_at as "created_at: DateTime",
+ created_sequence as "created_sequence: Sequence"
+ "#,
+ channel,
+ )
+ .map(|row| History {
+ channel: Channel {
+ id: row.id,
+ name: row.name,
+ },
+ created: Instant {
+ at: row.created_at,
+ sequence: row.created_sequence,
+ },
+ deleted: Some(*deleted),
+ })
+ .fetch_one(&mut *self.0)
+ .await?;
+
+ Ok(channel)
+ }
+
+ pub async fn expired(&mut self, expired_at: &DateTime) -> Result<Vec<Id>, sqlx::Error> {
+ let channels = sqlx::query_scalar!(
+ r#"
+ select
+ channel.id as "id: Id"
+ from channel
+ left join message
+ where created_at < $1
+ and message.id is null
+ "#,
+ expired_at,
+ )
+ .fetch_all(&mut *self.0)
+ .await?;
+
+ Ok(channels)
+ }
+}