summaryrefslogtreecommitdiff
path: root/src/repo
diff options
context:
space:
mode:
Diffstat (limited to 'src/repo')
-rw-r--r--src/repo/channel.rs53
-rw-r--r--src/repo/mod.rs1
-rw-r--r--src/repo/sequence.rs45
3 files changed, 87 insertions, 12 deletions
diff --git a/src/repo/channel.rs b/src/repo/channel.rs
index 3c7468f..efc2ced 100644
--- a/src/repo/channel.rs
+++ b/src/repo/channel.rs
@@ -2,9 +2,10 @@ use std::fmt;
use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction};
+use super::sequence::Sequence;
use crate::{
clock::DateTime,
- events::types::{self, Sequence},
+ events::types::{self},
id::Id as BaseId,
};
@@ -26,6 +27,8 @@ pub struct Channel {
pub name: String,
#[serde(skip)]
pub created_at: DateTime,
+ #[serde(skip)]
+ pub created_sequence: Sequence,
}
impl<'c> Channels<'c> {
@@ -33,25 +36,25 @@ impl<'c> Channels<'c> {
&mut self,
name: &str,
created_at: &DateTime,
+ created_sequence: Sequence,
) -> Result<Channel, sqlx::Error> {
let id = Id::generate();
- let sequence = Sequence::default();
-
let channel = sqlx::query_as!(
Channel,
r#"
insert
- into channel (id, name, created_at, last_sequence)
+ into channel (id, name, created_at, created_sequence)
values ($1, $2, $3, $4)
returning
id as "id: Id",
name,
- created_at as "created_at: DateTime"
+ created_at as "created_at: DateTime",
+ created_sequence as "created_sequence: Sequence"
"#,
id,
name,
created_at,
- sequence,
+ created_sequence,
)
.fetch_one(&mut *self.0)
.await?;
@@ -66,7 +69,8 @@ impl<'c> Channels<'c> {
select
id as "id: Id",
name,
- created_at as "created_at: DateTime"
+ created_at as "created_at: DateTime",
+ created_sequence as "created_sequence: Sequence"
from channel
where id = $1
"#,
@@ -85,7 +89,8 @@ impl<'c> Channels<'c> {
select
id as "id: Id",
name,
- created_at as "created_at: DateTime"
+ created_at as "created_at: DateTime",
+ created_sequence as "created_sequence: Sequence"
from channel
order by channel.name
"#,
@@ -96,11 +101,34 @@ impl<'c> Channels<'c> {
Ok(channels)
}
- pub async fn delete_expired(
+ pub async fn replay(
+ &mut self,
+ resume_at: Option<Sequence>,
+ ) -> Result<Vec<Channel>, sqlx::Error> {
+ let channels = sqlx::query_as!(
+ Channel,
+ r#"
+ select
+ id as "id: Id",
+ name,
+ created_at as "created_at: DateTime",
+ created_sequence as "created_sequence: Sequence"
+ from channel
+ where coalesce(created_sequence > $1, true)
+ "#,
+ resume_at,
+ )
+ .fetch_all(&mut *self.0)
+ .await?;
+
+ Ok(channels)
+ }
+
+ pub async fn delete(
&mut self,
channel: &Channel,
- sequence: Sequence,
deleted_at: &DateTime,
+ deleted_sequence: Sequence,
) -> Result<types::ChannelEvent, sqlx::Error> {
let channel = channel.id.clone();
sqlx::query_scalar!(
@@ -115,7 +143,7 @@ impl<'c> Channels<'c> {
.await?;
Ok(types::ChannelEvent {
- sequence,
+ sequence: deleted_sequence,
at: *deleted_at,
data: types::DeletedEvent { channel }.into(),
})
@@ -128,7 +156,8 @@ impl<'c> Channels<'c> {
select
channel.id as "id: Id",
channel.name,
- channel.created_at as "created_at: DateTime"
+ channel.created_at as "created_at: DateTime",
+ channel.created_sequence as "created_sequence: Sequence"
from channel
left join message
where created_at < $1
diff --git a/src/repo/mod.rs b/src/repo/mod.rs
index cb9d7c8..8f271f4 100644
--- a/src/repo/mod.rs
+++ b/src/repo/mod.rs
@@ -3,4 +3,5 @@ pub mod error;
pub mod login;
pub mod message;
pub mod pool;
+pub mod sequence;
pub mod token;
diff --git a/src/repo/sequence.rs b/src/repo/sequence.rs
new file mode 100644
index 0000000..8fe9dab
--- /dev/null
+++ b/src/repo/sequence.rs
@@ -0,0 +1,45 @@
+use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction};
+
+pub trait Provider {
+ fn sequence(&mut self) -> Sequences;
+}
+
+impl<'c> Provider for Transaction<'c, Sqlite> {
+ fn sequence(&mut self) -> Sequences {
+ Sequences(self)
+ }
+}
+
+#[derive(
+ Clone,
+ Copy,
+ Debug,
+ Eq,
+ Ord,
+ PartialEq,
+ PartialOrd,
+ serde::Deserialize,
+ serde::Serialize,
+ sqlx::Type,
+)]
+#[serde(transparent)]
+#[sqlx(transparent)]
+pub struct Sequence(i64);
+
+pub struct Sequences<'t>(&'t mut SqliteConnection);
+
+impl<'c> Sequences<'c> {
+ pub async fn next(&mut self) -> Result<Sequence, sqlx::Error> {
+ let next = sqlx::query_scalar!(
+ r#"
+ update event_sequence
+ set last_value = last_value + 1
+ returning last_value as "next_value: Sequence"
+ "#,
+ )
+ .fetch_one(&mut *self.0)
+ .await?;
+
+ Ok(next)
+ }
+}