summaryrefslogtreecommitdiff
path: root/src/channel
diff options
context:
space:
mode:
Diffstat (limited to 'src/channel')
-rw-r--r--src/channel/app.rs6
-rw-r--r--src/channel/mod.rs1
-rw-r--r--src/channel/repo.rs165
3 files changed, 168 insertions, 4 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs
index 1422651..ef0a63f 100644
--- a/src/channel/app.rs
+++ b/src/channel/app.rs
@@ -2,11 +2,9 @@ use chrono::TimeDelta;
use sqlx::sqlite::SqlitePool;
use crate::{
- channel::Channel,
+ channel::{repo::Provider as _, Channel},
clock::DateTime,
- event::Sequence,
- event::{broadcaster::Broadcaster, types::ChannelEvent},
- repo::{channel::Provider as _, sequence::Provider as _},
+ event::{broadcaster::Broadcaster, repo::Provider as _, types::ChannelEvent, Sequence},
};
pub struct Channels<'a> {
diff --git a/src/channel/mod.rs b/src/channel/mod.rs
index 02d0ed4..2672084 100644
--- a/src/channel/mod.rs
+++ b/src/channel/mod.rs
@@ -2,6 +2,7 @@ use crate::{clock::DateTime, event::Sequence};
pub mod app;
mod id;
+pub mod repo;
mod routes;
pub use self::{id::Id, routes::router};
diff --git a/src/channel/repo.rs b/src/channel/repo.rs
new file mode 100644
index 0000000..18cd81f
--- /dev/null
+++ b/src/channel/repo.rs
@@ -0,0 +1,165 @@
+use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction};
+
+use crate::{
+ channel::{Channel, Id},
+ clock::DateTime,
+ event::{types, Sequence},
+};
+
+pub trait Provider {
+ fn channels(&mut self) -> Channels;
+}
+
+impl<'c> Provider for Transaction<'c, Sqlite> {
+ fn channels(&mut self) -> Channels {
+ Channels(self)
+ }
+}
+
+pub struct Channels<'t>(&'t mut SqliteConnection);
+
+impl<'c> Channels<'c> {
+ pub async fn create(
+ &mut self,
+ name: &str,
+ created_at: &DateTime,
+ created_sequence: Sequence,
+ ) -> Result<Channel, sqlx::Error> {
+ let id = Id::generate();
+ let channel = sqlx::query_as!(
+ Channel,
+ r#"
+ insert
+ into channel (id, name, created_at, created_sequence)
+ values ($1, $2, $3, $4)
+ returning
+ id as "id: Id",
+ name,
+ created_at as "created_at: DateTime",
+ created_sequence as "created_sequence: Sequence"
+ "#,
+ id,
+ name,
+ created_at,
+ created_sequence,
+ )
+ .fetch_one(&mut *self.0)
+ .await?;
+
+ Ok(channel)
+ }
+
+ pub async fn by_id(&mut self, channel: &Id) -> Result<Channel, sqlx::Error> {
+ let channel = sqlx::query_as!(
+ Channel,
+ r#"
+ select
+ id as "id: Id",
+ name,
+ created_at as "created_at: DateTime",
+ created_sequence as "created_sequence: Sequence"
+ from channel
+ where id = $1
+ "#,
+ channel,
+ )
+ .fetch_one(&mut *self.0)
+ .await?;
+
+ Ok(channel)
+ }
+
+ pub async fn all(
+ &mut self,
+ resume_point: Option<Sequence>,
+ ) -> Result<Vec<Channel>, sqlx::Error> {
+ let channels = sqlx::query_as!(
+ Channel,
+ r#"
+ select
+ id as "id: Id",
+ name,
+ created_at as "created_at: DateTime",
+ created_sequence as "created_sequence: Sequence"
+ from channel
+ where coalesce(created_sequence <= $1, true)
+ order by channel.name
+ "#,
+ resume_point,
+ )
+ .fetch_all(&mut *self.0)
+ .await?;
+
+ Ok(channels)
+ }
+
+ pub async fn replay(
+ &mut self,
+ resume_at: Option<Sequence>,
+ ) -> Result<Vec<Channel>, sqlx::Error> {
+ let channels = sqlx::query_as!(
+ Channel,
+ r#"
+ select
+ id as "id: Id",
+ name,
+ created_at as "created_at: DateTime",
+ created_sequence as "created_sequence: Sequence"
+ from channel
+ where coalesce(created_sequence > $1, true)
+ "#,
+ resume_at,
+ )
+ .fetch_all(&mut *self.0)
+ .await?;
+
+ Ok(channels)
+ }
+
+ pub async fn delete(
+ &mut self,
+ channel: &Channel,
+ deleted_at: &DateTime,
+ deleted_sequence: Sequence,
+ ) -> Result<types::ChannelEvent, sqlx::Error> {
+ let channel = channel.id.clone();
+ sqlx::query_scalar!(
+ r#"
+ delete from channel
+ where id = $1
+ returning 1 as "row: i64"
+ "#,
+ channel,
+ )
+ .fetch_one(&mut *self.0)
+ .await?;
+
+ Ok(types::ChannelEvent {
+ sequence: deleted_sequence,
+ at: *deleted_at,
+ data: types::DeletedEvent { channel }.into(),
+ })
+ }
+
+ pub async fn expired(&mut self, expired_at: &DateTime) -> Result<Vec<Channel>, sqlx::Error> {
+ let channels = sqlx::query_as!(
+ Channel,
+ r#"
+ select
+ channel.id as "id: Id",
+ channel.name,
+ channel.created_at as "created_at: DateTime",
+ channel.created_sequence as "created_sequence: Sequence"
+ from channel
+ left join message
+ where created_at < $1
+ and message.id is null
+ "#,
+ expired_at,
+ )
+ .fetch_all(&mut *self.0)
+ .await?;
+
+ Ok(channels)
+ }
+}