summaryrefslogtreecommitdiff
path: root/src/channel
diff options
context:
space:
mode:
authorOwen Jacobson <owen@grimoire.ca>2024-10-01 22:30:04 -0400
committerOwen Jacobson <owen@grimoire.ca>2024-10-01 22:43:14 -0400
commitb8392a5fe824eff46f912a58885546e7b0f37e6f (patch)
treeff4061bbf4be30c53f84c179f86e8e6ab584dbda /src/channel
parent7645411bcf7201e3a4927566da78080dc6a84ccf (diff)
Track event sequences globally, not per channel.
Per-channel event sequences were a cute idea, but it made reasoning about event resumption much, much harder (case in point: recovering the order of events in a partially-ordered collection is quadratic, since it's basically graph sort). The minor overhead of a global sequence number is likely tolerable, and this simplifies both the API and the internals.
Diffstat (limited to 'src/channel')
-rw-r--r--src/channel/app.rs14
-rw-r--r--src/channel/routes/test/on_create.rs5
-rw-r--r--src/channel/routes/test/on_send.rs4
3 files changed, 13 insertions, 10 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs
index 70cda47..88f4170 100644
--- a/src/channel/app.rs
+++ b/src/channel/app.rs
@@ -3,8 +3,11 @@ use sqlx::sqlite::SqlitePool;
use crate::{
clock::DateTime,
- events::{broadcaster::Broadcaster, repo::message::Provider as _, types::ChannelEvent},
- repo::channel::{Channel, Provider as _},
+ events::{broadcaster::Broadcaster, types::ChannelEvent},
+ repo::{
+ channel::{Channel, Provider as _},
+ sequence::Provider as _,
+ },
};
pub struct Channels<'a> {
@@ -19,9 +22,10 @@ impl<'a> Channels<'a> {
pub async fn create(&self, name: &str, created_at: &DateTime) -> Result<Channel, CreateError> {
let mut tx = self.db.begin().await?;
+ let created_sequence = tx.sequence().next().await?;
let channel = tx
.channels()
- .create(name, created_at)
+ .create(name, created_at, created_sequence)
.await
.map_err(|err| CreateError::from_duplicate_name(err, name))?;
tx.commit().await?;
@@ -49,10 +53,10 @@ impl<'a> Channels<'a> {
let mut events = Vec::with_capacity(expired.len());
for channel in expired {
- let sequence = tx.message_events().assign_sequence(&channel).await?;
+ let deleted_sequence = tx.sequence().next().await?;
let event = tx
.channels()
- .delete_expired(&channel, sequence, relative_to)
+ .delete(&channel, relative_to, deleted_sequence)
.await?;
events.push(event);
}
diff --git a/src/channel/routes/test/on_create.rs b/src/channel/routes/test/on_create.rs
index e2610a5..5deb88a 100644
--- a/src/channel/routes/test/on_create.rs
+++ b/src/channel/routes/test/on_create.rs
@@ -38,18 +38,17 @@ async fn new_channel() {
let mut events = app
.events()
- .subscribe(types::ResumePoint::default())
+ .subscribe(None)
.await
.expect("subscribing never fails")
.filter(fixtures::filter::created());
- let types::ResumableEvent(_, event) = events
+ let event = events
.next()
.immediately()
.await
.expect("creation event published");
- assert_eq!(types::Sequence::default(), event.sequence);
assert!(matches!(
event.data,
types::ChannelEventData::Created(event)
diff --git a/src/channel/routes/test/on_send.rs b/src/channel/routes/test/on_send.rs
index 233518b..d37ed21 100644
--- a/src/channel/routes/test/on_send.rs
+++ b/src/channel/routes/test/on_send.rs
@@ -43,7 +43,7 @@ async fn messages_in_order() {
let events = app
.events()
- .subscribe(types::ResumePoint::default())
+ .subscribe(None)
.await
.expect("subscribing to a valid channel")
.filter(fixtures::filter::messages())
@@ -51,7 +51,7 @@ async fn messages_in_order() {
let events = events.collect::<Vec<_>>().immediately().await;
- for ((sent_at, message), types::ResumableEvent(_, event)) in requests.into_iter().zip(events) {
+ for ((sent_at, message), event) in requests.into_iter().zip(events) {
assert_eq!(*sent_at, event.at);
assert!(matches!(
event.data,