summaryrefslogtreecommitdiff
path: root/src/channel
diff options
context:
space:
mode:
Diffstat (limited to 'src/channel')
-rw-r--r--src/channel/history.rs6
-rw-r--r--src/channel/repo.rs11
-rw-r--r--src/channel/routes/channel/test/post.rs3
-rw-r--r--src/channel/routes/test.rs3
4 files changed, 13 insertions, 10 deletions
diff --git a/src/channel/history.rs b/src/channel/history.rs
index dda7bb9..ef2120d 100644
--- a/src/channel/history.rs
+++ b/src/channel/history.rs
@@ -4,7 +4,7 @@ use super::{
event::{Created, Deleted, Event},
Channel, Id,
};
-use crate::event::{Instant, ResumePoint, Sequence};
+use crate::event::{Instant, Sequence};
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct History {
@@ -28,9 +28,9 @@ impl History {
self.channel.clone()
}
- pub fn as_of(&self, resume_point: impl Into<ResumePoint>) -> Option<Channel> {
+ pub fn as_of(&self, resume_point: Sequence) -> Option<Channel> {
self.events()
- .filter(Sequence::up_to(resume_point.into()))
+ .filter(Sequence::up_to(resume_point))
.collect()
}
diff --git a/src/channel/repo.rs b/src/channel/repo.rs
index f47e564..7206c21 100644
--- a/src/channel/repo.rs
+++ b/src/channel/repo.rs
@@ -5,7 +5,7 @@ use crate::{
channel::{Channel, History, Id},
clock::DateTime,
db::NotFound,
- event::{Instant, ResumePoint, Sequence},
+ event::{Instant, Sequence},
name::{self, Name},
};
@@ -144,13 +144,13 @@ impl<'c> Channels<'c> {
Ok(channels)
}
- pub async fn replay(&mut self, resume_at: ResumePoint) -> Result<Vec<History>, LoadError> {
+ pub async fn replay(&mut self, resume_at: Sequence) -> Result<Vec<History>, LoadError> {
let channels = sqlx::query!(
r#"
select
id as "id: Id",
- name.display_name as "display_name: String",
- name.canonical_name as "canonical_name: String",
+ name.display_name as "display_name?: String",
+ name.canonical_name as "canonical_name?: String",
channel.created_at as "created_at: DateTime",
channel.created_sequence as "created_sequence: Sequence",
deleted.deleted_at as "deleted_at?: DateTime",
@@ -160,7 +160,8 @@ impl<'c> Channels<'c> {
using (id)
left join channel_deleted as deleted
using (id)
- where coalesce(channel.created_sequence > $1, true)
+ where channel.created_sequence > $1
+ or deleted.deleted_sequence > $1
"#,
resume_at,
)
diff --git a/src/channel/routes/channel/test/post.rs b/src/channel/routes/channel/test/post.rs
index 111a703..bc0684b 100644
--- a/src/channel/routes/channel/test/post.rs
+++ b/src/channel/routes/channel/test/post.rs
@@ -15,6 +15,7 @@ async fn messages_in_order() {
let app = fixtures::scratch_app().await;
let sender = fixtures::identity::create(&app, &fixtures::now()).await;
let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
// Call the endpoint (twice)
@@ -41,7 +42,7 @@ async fn messages_in_order() {
let mut events = app
.events()
- .subscribe(None)
+ .subscribe(resume_point)
.await
.expect("subscribing to a valid channel succeeds")
.filter_map(fixtures::event::message)
diff --git a/src/channel/routes/test.rs b/src/channel/routes/test.rs
index f5369fb..cba8f2e 100644
--- a/src/channel/routes/test.rs
+++ b/src/channel/routes/test.rs
@@ -16,6 +16,7 @@ async fn new_channel() {
let app = fixtures::scratch_app().await;
let creator = fixtures::identity::create(&app, &fixtures::now()).await;
+ let resume_point = fixtures::boot::resume_point(&app).await;
// Call the endpoint
@@ -44,7 +45,7 @@ async fn new_channel() {
let mut events = app
.events()
- .subscribe(None)
+ .subscribe(resume_point)
.await
.expect("subscribing never fails")
.filter_map(fixtures::event::channel)