summaryrefslogtreecommitdiff
path: root/src/repo/channel.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/repo/channel.rs')
-rw-r--r--src/repo/channel.rs84
1 files changed, 75 insertions, 9 deletions
diff --git a/src/repo/channel.rs b/src/repo/channel.rs
index 0186413..3c7468f 100644
--- a/src/repo/channel.rs
+++ b/src/repo/channel.rs
@@ -2,7 +2,11 @@ use std::fmt;
use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction};
-use crate::id::Id as BaseId;
+use crate::{
+ clock::DateTime,
+ events::types::{self, Sequence},
+ id::Id as BaseId,
+};
pub trait Provider {
fn channels(&mut self) -> Channels;
@@ -16,26 +20,38 @@ impl<'c> Provider for Transaction<'c, Sqlite> {
pub struct Channels<'t>(&'t mut SqliteConnection);
-#[derive(Debug, Eq, PartialEq, serde::Serialize)]
+#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
pub struct Channel {
pub id: Id,
pub name: String,
+ #[serde(skip)]
+ pub created_at: DateTime,
}
impl<'c> Channels<'c> {
- pub async fn create(&mut self, name: &str) -> Result<Channel, sqlx::Error> {
+ pub async fn create(
+ &mut self,
+ name: &str,
+ created_at: &DateTime,
+ ) -> Result<Channel, sqlx::Error> {
let id = Id::generate();
+ let sequence = Sequence::default();
let channel = sqlx::query_as!(
Channel,
r#"
insert
- into channel (id, name)
- values ($1, $2)
- returning id as "id: Id", name
+ into channel (id, name, created_at, last_sequence)
+ values ($1, $2, $3, $4)
+ returning
+ id as "id: Id",
+ name,
+ created_at as "created_at: DateTime"
"#,
id,
name,
+ created_at,
+ sequence,
)
.fetch_one(&mut *self.0)
.await?;
@@ -47,7 +63,10 @@ impl<'c> Channels<'c> {
let channel = sqlx::query_as!(
Channel,
r#"
- select id as "id: Id", name
+ select
+ id as "id: Id",
+ name,
+ created_at as "created_at: DateTime"
from channel
where id = $1
"#,
@@ -64,8 +83,9 @@ impl<'c> Channels<'c> {
Channel,
r#"
select
- channel.id as "id: Id",
- channel.name
+ id as "id: Id",
+ name,
+ created_at as "created_at: DateTime"
from channel
order by channel.name
"#,
@@ -75,6 +95,52 @@ impl<'c> Channels<'c> {
Ok(channels)
}
+
+ pub async fn delete_expired(
+ &mut self,
+ channel: &Channel,
+ sequence: Sequence,
+ deleted_at: &DateTime,
+ ) -> Result<types::ChannelEvent, sqlx::Error> {
+ let channel = channel.id.clone();
+ sqlx::query_scalar!(
+ r#"
+ delete from channel
+ where id = $1
+ returning 1 as "row: i64"
+ "#,
+ channel,
+ )
+ .fetch_one(&mut *self.0)
+ .await?;
+
+ Ok(types::ChannelEvent {
+ sequence,
+ at: *deleted_at,
+ data: types::DeletedEvent { channel }.into(),
+ })
+ }
+
+ pub async fn expired(&mut self, expired_at: &DateTime) -> Result<Vec<Channel>, sqlx::Error> {
+ let channels = sqlx::query_as!(
+ Channel,
+ r#"
+ select
+ channel.id as "id: Id",
+ channel.name,
+ channel.created_at as "created_at: DateTime"
+ from channel
+ left join message
+ where created_at < $1
+ and message.id is null
+ "#,
+ expired_at,
+ )
+ .fetch_all(&mut *self.0)
+ .await?;
+
+ Ok(channels)
+ }
}
// Stable identifier for a [Channel]. Prefixed with `C`.