diff options
| author | Owen Jacobson <owen@grimoire.ca> | 2024-10-01 22:30:04 -0400 |
|---|---|---|
| committer | Owen Jacobson <owen@grimoire.ca> | 2024-10-01 22:43:14 -0400 |
| commit | b8392a5fe824eff46f912a58885546e7b0f37e6f (patch) | |
| tree | ff4061bbf4be30c53f84c179f86e8e6ab584dbda /src/channel | |
| parent | 7645411bcf7201e3a4927566da78080dc6a84ccf (diff) | |
Track event sequences globally, not per channel.
Per-channel event sequences were a cute idea, but it made reasoning about event resumption much, much harder (case in point: recovering the order of events in a partially-ordered collection is quadratic, since it's basically graph sort). The minor overhead of a global sequence number is likely tolerable, and this simplifies both the API and the internals.
Diffstat (limited to 'src/channel')
| -rw-r--r-- | src/channel/app.rs | 14 | ||||
| -rw-r--r-- | src/channel/routes/test/on_create.rs | 5 | ||||
| -rw-r--r-- | src/channel/routes/test/on_send.rs | 4 |
3 files changed, 13 insertions, 10 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs index 70cda47..88f4170 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -3,8 +3,11 @@ use sqlx::sqlite::SqlitePool; use crate::{ clock::DateTime, - events::{broadcaster::Broadcaster, repo::message::Provider as _, types::ChannelEvent}, - repo::channel::{Channel, Provider as _}, + events::{broadcaster::Broadcaster, types::ChannelEvent}, + repo::{ + channel::{Channel, Provider as _}, + sequence::Provider as _, + }, }; pub struct Channels<'a> { @@ -19,9 +22,10 @@ impl<'a> Channels<'a> { pub async fn create(&self, name: &str, created_at: &DateTime) -> Result<Channel, CreateError> { let mut tx = self.db.begin().await?; + let created_sequence = tx.sequence().next().await?; let channel = tx .channels() - .create(name, created_at) + .create(name, created_at, created_sequence) .await .map_err(|err| CreateError::from_duplicate_name(err, name))?; tx.commit().await?; @@ -49,10 +53,10 @@ impl<'a> Channels<'a> { let mut events = Vec::with_capacity(expired.len()); for channel in expired { - let sequence = tx.message_events().assign_sequence(&channel).await?; + let deleted_sequence = tx.sequence().next().await?; let event = tx .channels() - .delete_expired(&channel, sequence, relative_to) + .delete(&channel, relative_to, deleted_sequence) .await?; events.push(event); } diff --git a/src/channel/routes/test/on_create.rs b/src/channel/routes/test/on_create.rs index e2610a5..5deb88a 100644 --- a/src/channel/routes/test/on_create.rs +++ b/src/channel/routes/test/on_create.rs @@ -38,18 +38,17 @@ async fn new_channel() { let mut events = app .events() - .subscribe(types::ResumePoint::default()) + .subscribe(None) .await .expect("subscribing never fails") .filter(fixtures::filter::created()); - let types::ResumableEvent(_, event) = events + let event = events .next() .immediately() .await .expect("creation event published"); - assert_eq!(types::Sequence::default(), event.sequence); assert!(matches!( event.data, types::ChannelEventData::Created(event) diff --git a/src/channel/routes/test/on_send.rs b/src/channel/routes/test/on_send.rs index 233518b..d37ed21 100644 --- a/src/channel/routes/test/on_send.rs +++ b/src/channel/routes/test/on_send.rs @@ -43,7 +43,7 @@ async fn messages_in_order() { let events = app .events() - .subscribe(types::ResumePoint::default()) + .subscribe(None) .await .expect("subscribing to a valid channel") .filter(fixtures::filter::messages()) @@ -51,7 +51,7 @@ async fn messages_in_order() { let events = events.collect::<Vec<_>>().immediately().await; - for ((sent_at, message), types::ResumableEvent(_, event)) in requests.into_iter().zip(events) { + for ((sent_at, message), event) in requests.into_iter().zip(events) { assert_eq!(*sent_at, event.at); assert!(matches!( event.data, |
