summaryrefslogtreecommitdiff
path: root/src/repo/channel.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/repo/channel.rs')
-rw-r--r--src/repo/channel.rs53
1 files changed, 41 insertions, 12 deletions
diff --git a/src/repo/channel.rs b/src/repo/channel.rs
index 3c7468f..efc2ced 100644
--- a/src/repo/channel.rs
+++ b/src/repo/channel.rs
@@ -2,9 +2,10 @@ use std::fmt;
use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction};
+use super::sequence::Sequence;
use crate::{
clock::DateTime,
- events::types::{self, Sequence},
+ events::types::{self},
id::Id as BaseId,
};
@@ -26,6 +27,8 @@ pub struct Channel {
pub name: String,
#[serde(skip)]
pub created_at: DateTime,
+ #[serde(skip)]
+ pub created_sequence: Sequence,
}
impl<'c> Channels<'c> {
@@ -33,25 +36,25 @@ impl<'c> Channels<'c> {
&mut self,
name: &str,
created_at: &DateTime,
+ created_sequence: Sequence,
) -> Result<Channel, sqlx::Error> {
let id = Id::generate();
- let sequence = Sequence::default();
-
let channel = sqlx::query_as!(
Channel,
r#"
insert
- into channel (id, name, created_at, last_sequence)
+ into channel (id, name, created_at, created_sequence)
values ($1, $2, $3, $4)
returning
id as "id: Id",
name,
- created_at as "created_at: DateTime"
+ created_at as "created_at: DateTime",
+ created_sequence as "created_sequence: Sequence"
"#,
id,
name,
created_at,
- sequence,
+ created_sequence,
)
.fetch_one(&mut *self.0)
.await?;
@@ -66,7 +69,8 @@ impl<'c> Channels<'c> {
select
id as "id: Id",
name,
- created_at as "created_at: DateTime"
+ created_at as "created_at: DateTime",
+ created_sequence as "created_sequence: Sequence"
from channel
where id = $1
"#,
@@ -85,7 +89,8 @@ impl<'c> Channels<'c> {
select
id as "id: Id",
name,
- created_at as "created_at: DateTime"
+ created_at as "created_at: DateTime",
+ created_sequence as "created_sequence: Sequence"
from channel
order by channel.name
"#,
@@ -96,11 +101,34 @@ impl<'c> Channels<'c> {
Ok(channels)
}
- pub async fn delete_expired(
+ pub async fn replay(
+ &mut self,
+ resume_at: Option<Sequence>,
+ ) -> Result<Vec<Channel>, sqlx::Error> {
+ let channels = sqlx::query_as!(
+ Channel,
+ r#"
+ select
+ id as "id: Id",
+ name,
+ created_at as "created_at: DateTime",
+ created_sequence as "created_sequence: Sequence"
+ from channel
+ where coalesce(created_sequence > $1, true)
+ "#,
+ resume_at,
+ )
+ .fetch_all(&mut *self.0)
+ .await?;
+
+ Ok(channels)
+ }
+
+ pub async fn delete(
&mut self,
channel: &Channel,
- sequence: Sequence,
deleted_at: &DateTime,
+ deleted_sequence: Sequence,
) -> Result<types::ChannelEvent, sqlx::Error> {
let channel = channel.id.clone();
sqlx::query_scalar!(
@@ -115,7 +143,7 @@ impl<'c> Channels<'c> {
.await?;
Ok(types::ChannelEvent {
- sequence,
+ sequence: deleted_sequence,
at: *deleted_at,
data: types::DeletedEvent { channel }.into(),
})
@@ -128,7 +156,8 @@ impl<'c> Channels<'c> {
select
channel.id as "id: Id",
channel.name,
- channel.created_at as "created_at: DateTime"
+ channel.created_at as "created_at: DateTime",
+ channel.created_sequence as "created_sequence: Sequence"
from channel
left join message
where created_at < $1