diff options
| author | Owen Jacobson <owen@grimoire.ca> | 2025-06-19 00:49:49 -0400 |
|---|---|---|
| committer | Owen Jacobson <owen@grimoire.ca> | 2025-06-20 22:26:31 -0400 |
| commit | 057bbef5f37a4051615ad23661a0b4853b61162e (patch) | |
| tree | 2f7fecbdbe91f7b53be04b54df04364f310cfd14 /src | |
| parent | aae24382399f755cfd80e352be7f5aa584aa5470 (diff) | |
Support querying event sequences via iterators or streams.
These filters are meant to be used with, respectively, `Iterator::filter_map` and `StreamExt::filter_map`. The two operations are conceptually the same - they pass an item from the underlying sequence to a function that returns an option, drops the values for which the function returns `None`, and yields the value inside of `Some` in the resulting sequence.
However, `Iterator::filter_map` takes a function from the iterator elements to `Option<T>`. `StreamExt::filter_map` takes a function from the iterator elements to _a `Future` whose output is `Option<T>`_. As such, you can't easily use functions designed for one use case, for the other. You need an adapter - conventionally, `futures::ready`, if you have a non-async function and need an async one.
This provides two sets of sequence filters:
* `crate::test::fixtures::event` contains functions which return `Option` directly, and which are intended for use with `Iterator::filter_map`.
* `crate::test::fixtures::event::stream` contains lifted versions that return a `Future`, and which are intended for use with `StreamExt::filter_map`.
The lifting is done purely manually. I spent a lot of time writing clever-er versions before deciding on this; those were fun to write, but hell to read and not meaningfully better, and this is test support code, so we want it to be dumb and obvious. Complexity for the sake of intellectual satisfaction is a huge antifeature in this context.
Diffstat (limited to 'src')
| -rw-r--r-- | src/channel/handlers/create/test.rs | 4 | ||||
| -rw-r--r-- | src/channel/handlers/send/test.rs | 4 | ||||
| -rw-r--r-- | src/event/handlers/stream/test/channel.rs | 28 | ||||
| -rw-r--r-- | src/event/handlers/stream/test/invite.rs | 8 | ||||
| -rw-r--r-- | src/event/handlers/stream/test/message.rs | 36 | ||||
| -rw-r--r-- | src/event/handlers/stream/test/resume.rs | 20 | ||||
| -rw-r--r-- | src/event/handlers/stream/test/setup.rs | 4 | ||||
| -rw-r--r-- | src/event/handlers/stream/test/token.rs | 12 | ||||
| -rw-r--r-- | src/test/fixtures/event.rs | 79 | ||||
| -rw-r--r-- | src/test/fixtures/event/mod.rs | 74 | ||||
| -rw-r--r-- | src/test/fixtures/event/stream.rs | 62 |
11 files changed, 194 insertions, 137 deletions
diff --git a/src/channel/handlers/create/test.rs b/src/channel/handlers/create/test.rs index 3c770cf..595a879 100644 --- a/src/channel/handlers/create/test.rs +++ b/src/channel/handlers/create/test.rs @@ -47,8 +47,8 @@ async fn new_channel() { .subscribe(resume_point) .await .expect("subscribing never fails") - .filter_map(fixtures::event::channel) - .filter_map(fixtures::event::channel::created) + .filter_map(fixtures::event::stream::channel) + .filter_map(fixtures::event::stream::channel::created) .filter(|event| future::ready(event.channel == response)); let event = events.next().expect_some("creation event published").await; diff --git a/src/channel/handlers/send/test.rs b/src/channel/handlers/send/test.rs index f43f901..7204ca4 100644 --- a/src/channel/handlers/send/test.rs +++ b/src/channel/handlers/send/test.rs @@ -45,8 +45,8 @@ async fn messages_in_order() { .subscribe(resume_point) .await .expect("subscribing to a valid channel succeeds") - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::sent) + .filter_map(fixtures::event::stream::message) + .filter_map(fixtures::event::stream::message::sent) .zip(stream::iter(requests)); while let Some((event, (sent_at, body))) = events diff --git a/src/event/handlers/stream/test/channel.rs b/src/event/handlers/stream/test/channel.rs index 187c3c3..2b87ce2 100644 --- a/src/event/handlers/stream/test/channel.rs +++ b/src/event/handlers/stream/test/channel.rs @@ -35,8 +35,8 @@ async fn creating() { // Verify channel created event events - .filter_map(fixtures::event::channel) - .filter_map(fixtures::event::channel::created) + .filter_map(fixtures::event::stream::channel) + .filter_map(fixtures::event::stream::channel::created) .filter(|event| future::ready(event.channel == channel)) .next() .expect_some("channel created event is delivered") @@ -74,8 +74,8 @@ async fn previously_created() { // Verify channel created event let _ = events - .filter_map(fixtures::event::channel) - .filter_map(fixtures::event::channel::created) + .filter_map(fixtures::event::stream::channel) + .filter_map(fixtures::event::stream::channel::created) .filter(|event| future::ready(event.channel == channel)) .next() .expect_some("channel created event is delivered") @@ -111,8 +111,8 @@ async fn expiring() { // Check for expiry event let _ = events - .filter_map(fixtures::event::channel) - .filter_map(fixtures::event::channel::deleted) + .filter_map(fixtures::event::stream::channel) + .filter_map(fixtures::event::stream::channel::deleted) .filter(|event| future::ready(event.id == channel.id)) .next() .expect_some("a deleted channel event will be delivered") @@ -148,8 +148,8 @@ async fn previously_expired() { // Check for expiry event let _ = events - .filter_map(fixtures::event::channel) - .filter_map(fixtures::event::channel::deleted) + .filter_map(fixtures::event::stream::channel) + .filter_map(fixtures::event::stream::channel::deleted) .filter(|event| future::ready(event.id == channel.id)) .next() .expect_some("a deleted channel event will be delivered") @@ -185,8 +185,8 @@ async fn deleting() { // Check for delete event let _ = events - .filter_map(fixtures::event::channel) - .filter_map(fixtures::event::channel::deleted) + .filter_map(fixtures::event::stream::channel) + .filter_map(fixtures::event::stream::channel::deleted) .filter(|event| future::ready(event.id == channel.id)) .next() .expect_some("a deleted channel event will be delivered") @@ -222,8 +222,8 @@ async fn previously_deleted() { // Check for expiry event let _ = events - .filter_map(fixtures::event::channel) - .filter_map(fixtures::event::channel::deleted) + .filter_map(fixtures::event::stream::channel) + .filter_map(fixtures::event::stream::channel::deleted) .filter(|event| future::ready(event.id == channel.id)) .next() .expect_some("a deleted channel event will be delivered") @@ -264,8 +264,8 @@ async fn previously_purged() { // Check for expiry event events - .filter_map(fixtures::event::channel) - .filter_map(fixtures::event::channel::deleted) + .filter_map(fixtures::event::stream::channel) + .filter_map(fixtures::event::stream::channel::deleted) .filter(|event| future::ready(event.id == channel.id)) .next() .expect_wait("deleted channel events not delivered") diff --git a/src/event/handlers/stream/test/invite.rs b/src/event/handlers/stream/test/invite.rs index c8e12fb..01372ce 100644 --- a/src/event/handlers/stream/test/invite.rs +++ b/src/event/handlers/stream/test/invite.rs @@ -37,8 +37,8 @@ async fn accepting_invite() { // Expect a login created event let _ = events - .filter_map(fixtures::event::user) - .filter_map(fixtures::event::user::created) + .filter_map(fixtures::event::stream::user) + .filter_map(fixtures::event::stream::user::created) .filter(|event| future::ready(event.user == joiner)) .next() .expect_some("a login created event is sent") @@ -78,8 +78,8 @@ async fn previously_accepted_invite() { // Expect a login created event let _ = events - .filter_map(fixtures::event::user) - .filter_map(fixtures::event::user::created) + .filter_map(fixtures::event::stream::user) + .filter_map(fixtures::event::stream::user::created) .filter(|event| future::ready(event.user == joiner)) .next() .expect_some("a login created event is sent") diff --git a/src/event/handlers/stream/test/message.rs b/src/event/handlers/stream/test/message.rs index a80c896..4369996 100644 --- a/src/event/handlers/stream/test/message.rs +++ b/src/event/handlers/stream/test/message.rs @@ -44,8 +44,8 @@ async fn sending() { // Verify that an event is delivered let _ = events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::sent) + .filter_map(fixtures::event::stream::message) + .filter_map(fixtures::event::stream::message::sent) .filter(|event| future::ready(event.message == message)) .next() .expect_some("delivered message sent event") @@ -89,8 +89,8 @@ async fn previously_sent() { // Verify that an event is delivered let _ = events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::sent) + .filter_map(fixtures::event::stream::message) + .filter_map(fixtures::event::stream::message::sent) .filter(|event| future::ready(event.message == message)) .next() .expect_some("delivered message sent event") @@ -135,8 +135,8 @@ async fn sent_in_multiple_channels() { // Verify the structure of the response. let events = events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::sent) + .filter_map(fixtures::event::stream::message) + .filter_map(fixtures::event::stream::message::sent) .take(messages.len()) .collect::<Vec<_>>() .expect_ready("events ready") @@ -177,8 +177,8 @@ async fn sent_sequentially() { // Verify the expected events in the expected order let mut events = events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::sent) + .filter_map(fixtures::event::stream::message) + .filter_map(fixtures::event::stream::message::sent) .filter(|event| future::ready(messages.iter().any(|message| &event.message == message))); for message in &messages { @@ -222,8 +222,8 @@ async fn expiring() { // Check for expiry event let _ = events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::deleted) + .filter_map(fixtures::event::stream::message) + .filter_map(fixtures::event::stream::message::deleted) .filter(|event| future::ready(event.id == message.id)) .next() .expect_some("a deleted message event will be delivered") @@ -261,8 +261,8 @@ async fn previously_expired() { // Check for expiry event let _ = events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::deleted) + .filter_map(fixtures::event::stream::message) + .filter_map(fixtures::event::stream::message::deleted) .filter(|event| future::ready(event.id == message.id)) .next() .expect_some("a deleted message event will be delivered") @@ -300,8 +300,8 @@ async fn deleting() { // Check for delete event let _ = events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::deleted) + .filter_map(fixtures::event::stream::message) + .filter_map(fixtures::event::stream::message::deleted) .filter(|event| future::ready(event.id == message.id)) .next() .expect_some("a deleted message event will be delivered") @@ -339,8 +339,8 @@ async fn previously_deleted() { // Check for delete event let _ = events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::deleted) + .filter_map(fixtures::event::stream::message) + .filter_map(fixtures::event::stream::message::deleted) .filter(|event| future::ready(event.id == message.id)) .next() .expect_some("a deleted message event will be delivered") @@ -384,8 +384,8 @@ async fn previously_purged() { // Check for delete event events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::deleted) + .filter_map(fixtures::event::stream::message) + .filter_map(fixtures::event::stream::message::deleted) .filter(|event| future::ready(event.id == message.id)) .next() .expect_wait("no deleted message will be delivered") diff --git a/src/event/handlers/stream/test/resume.rs b/src/event/handlers/stream/test/resume.rs index 34fee4d..835d350 100644 --- a/src/event/handlers/stream/test/resume.rs +++ b/src/event/handlers/stream/test/resume.rs @@ -41,8 +41,8 @@ async fn resume() { .expect("subscribe never fails"); let event = events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::sent) + .filter_map(fixtures::event::stream::message) + .filter_map(fixtures::event::stream::message::sent) .filter(|event| future::ready(event.message == initial_message)) .next() .expect_some("delivered event for initial message") @@ -64,8 +64,8 @@ async fn resume() { // Verify final events let mut events = resumed - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::sent) + .filter_map(fixtures::event::stream::message) + .filter_map(fixtures::event::stream::message::sent) .zip(stream::iter(later_messages)); while let Some((event, message)) = events.next().expect_ready("event ready").await { @@ -125,8 +125,8 @@ async fn serial_resume() { // Check for expected events let events = events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::sent) + .filter_map(fixtures::event::stream::message) + .filter_map(fixtures::event::stream::message::sent) .zip(stream::iter(initial_messages)) .collect::<Vec<_>>() .expect_ready("zipping a finite list of events is ready immediately") @@ -168,8 +168,8 @@ async fn serial_resume() { // Check for expected events let events = events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::sent) + .filter_map(fixtures::event::stream::message) + .filter_map(fixtures::event::stream::message::sent) .zip(stream::iter(resume_messages)) .collect::<Vec<_>>() .expect_ready("zipping a finite list of events is ready immediately") @@ -211,8 +211,8 @@ async fn serial_resume() { // Check for expected events let events = events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::sent) + .filter_map(fixtures::event::stream::message) + .filter_map(fixtures::event::stream::message::sent) .zip(stream::iter(final_messages)) .collect::<Vec<_>>() .expect_ready("zipping a finite list of events is ready immediately") diff --git a/src/event/handlers/stream/test/setup.rs b/src/event/handlers/stream/test/setup.rs index 5335055..992b962 100644 --- a/src/event/handlers/stream/test/setup.rs +++ b/src/event/handlers/stream/test/setup.rs @@ -38,8 +38,8 @@ async fn previously_completed() { // Expect a login created event let _ = events - .filter_map(fixtures::event::user) - .filter_map(fixtures::event::user::created) + .filter_map(fixtures::event::stream::user) + .filter_map(fixtures::event::stream::user::created) .filter(|event| future::ready(event.user == owner)) .next() .expect_some("a login created event is sent") diff --git a/src/event/handlers/stream/test/token.rs b/src/event/handlers/stream/test/token.rs index 2008323..e32b489 100644 --- a/src/event/handlers/stream/test/token.rs +++ b/src/event/handlers/stream/test/token.rs @@ -43,8 +43,8 @@ async fn terminates_on_token_expiry() { ]; events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::sent) + .filter_map(fixtures::event::stream::message) + .filter_map(fixtures::event::stream::message::sent) .filter(|event| future::ready(messages.iter().any(|message| &event.message == message))) .next() .expect_none("end of stream") @@ -89,8 +89,8 @@ async fn terminates_on_logout() { ]; events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::sent) + .filter_map(fixtures::event::stream::message) + .filter_map(fixtures::event::stream::message::sent) .filter(|event| future::ready(messages.iter().any(|message| &event.message == message))) .next() .expect_none("end of stream") @@ -139,8 +139,8 @@ async fn terminates_on_password_change() { ]; events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::sent) + .filter_map(fixtures::event::stream::message) + .filter_map(fixtures::event::stream::message::sent) .filter(|event| future::ready(messages.iter().any(|message| &event.message == message))) .next() .expect_none("end of stream") diff --git a/src/test/fixtures/event.rs b/src/test/fixtures/event.rs deleted file mode 100644 index a30bb4b..0000000 --- a/src/test/fixtures/event.rs +++ /dev/null @@ -1,79 +0,0 @@ -use std::future::{self, Ready}; - -use crate::event::Event; - -pub fn channel(event: Event) -> Ready<Option<channel::Event>> { - future::ready(match event { - Event::Channel(channel) => Some(channel), - _ => None, - }) -} - -pub fn message(event: Event) -> Ready<Option<message::Event>> { - future::ready(match event { - Event::Message(event) => Some(event), - _ => None, - }) -} - -pub fn user(event: Event) -> Ready<Option<user::Event>> { - future::ready(match event { - Event::User(event) => Some(event), - _ => None, - }) -} - -pub mod channel { - use std::future::{self, Ready}; - - pub use crate::channel::Event; - use crate::channel::event; - - pub fn created(event: Event) -> Ready<Option<event::Created>> { - future::ready(match event { - Event::Created(event) => Some(event), - Event::Deleted(_) => None, - }) - } - - pub fn deleted(event: Event) -> Ready<Option<event::Deleted>> { - future::ready(match event { - Event::Deleted(event) => Some(event), - Event::Created(_) => None, - }) - } -} - -pub mod message { - use std::future::{self, Ready}; - - pub use crate::message::Event; - use crate::message::event; - - pub fn sent(event: Event) -> Ready<Option<event::Sent>> { - future::ready(match event { - Event::Sent(event) => Some(event), - Event::Deleted(_) => None, - }) - } - - pub fn deleted(event: Event) -> future::Ready<Option<event::Deleted>> { - future::ready(match event { - Event::Deleted(event) => Some(event), - Event::Sent(_) => None, - }) - } -} - -pub mod user { - use std::future::{self, Ready}; - - pub use crate::user::Event; - use crate::user::event; - - pub fn created(event: Event) -> Ready<Option<event::Created>> { - future::ready(match event { - Event::Created(event) => Some(event), - }) - } -} diff --git a/src/test/fixtures/event/mod.rs b/src/test/fixtures/event/mod.rs new file mode 100644 index 0000000..691cdeb --- /dev/null +++ b/src/test/fixtures/event/mod.rs @@ -0,0 +1,74 @@ +use crate::event::Event; + +pub mod stream; + +pub fn channel(event: Event) -> Option<crate::channel::Event> { + match event { + Event::Channel(channel) => Some(channel), + _ => None, + } +} + +pub fn message(event: Event) -> Option<crate::message::Event> { + match event { + Event::Message(event) => Some(event), + _ => None, + } +} + +pub fn user(event: Event) -> Option<crate::user::Event> { + match event { + Event::User(event) => Some(event), + _ => None, + } +} + +pub mod channel { + use crate::channel::{Event, event}; + + pub fn created(event: Event) -> Option<event::Created> { + match event { + Event::Created(event) => Some(event), + Event::Deleted(_) => None, + } + } + + pub fn deleted(event: Event) -> Option<event::Deleted> { + match event { + Event::Deleted(event) => Some(event), + Event::Created(_) => None, + } + } +} + +pub mod message { + use crate::message::{Event, event}; + + pub fn sent(event: Event) -> Option<event::Sent> { + match event { + Event::Sent(event) => Some(event), + Event::Deleted(_) => None, + } + } + + pub fn deleted(event: Event) -> Option<event::Deleted> { + match event { + Event::Deleted(event) => Some(event), + Event::Sent(_) => None, + } + } +} + +pub mod user { + use crate::user::{Event, event}; + + // This could be defined as `-> event::Created`. However, I want the interface to be consistent + // with the event stream transformers for other types, and we'd have to refactor the return type + // to `-> Option<event::Created>` the instant users sprout a second event, anyways. + #[allow(clippy::unnecessary_wraps)] + pub fn created(event: Event) -> Option<event::Created> { + match event { + Event::Created(event) => Some(event), + } + } +} diff --git a/src/test/fixtures/event/stream.rs b/src/test/fixtures/event/stream.rs new file mode 100644 index 0000000..6c2a1bf --- /dev/null +++ b/src/test/fixtures/event/stream.rs @@ -0,0 +1,62 @@ +use std::future::{self, Ready}; + +use crate::{event::Event, test::fixtures::event}; + +pub fn channel(event: Event) -> Ready<Option<crate::channel::Event>> { + future::ready(event::channel(event)) +} + +pub fn message(event: Event) -> Ready<Option<crate::message::Event>> { + future::ready(event::message(event)) +} + +pub fn user(event: Event) -> Ready<Option<crate::user::Event>> { + future::ready(event::user(event)) +} + +pub mod channel { + use std::future::{self, Ready}; + + use crate::{ + channel::{Event, event}, + test::fixtures::event::channel, + }; + + pub fn created(event: Event) -> Ready<Option<event::Created>> { + future::ready(channel::created(event)) + } + + pub fn deleted(event: Event) -> Ready<Option<event::Deleted>> { + future::ready(channel::deleted(event)) + } +} + +pub mod message { + use std::future::{self, Ready}; + + use crate::{ + message::{Event, event}, + test::fixtures::event::message, + }; + + pub fn sent(event: Event) -> Ready<Option<event::Sent>> { + future::ready(message::sent(event)) + } + + pub fn deleted(event: Event) -> future::Ready<Option<event::Deleted>> { + future::ready(message::deleted(event)) + } +} + +pub mod user { + use std::future::{self, Ready}; + + use crate::{ + test::fixtures::event::user, + user::{Event, event}, + }; + + pub fn created(event: Event) -> Ready<Option<event::Created>> { + future::ready(user::created(event)) + } +} |
