From 057bbef5f37a4051615ad23661a0b4853b61162e Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Thu, 19 Jun 2025 00:49:49 -0400 Subject: Support querying event sequences via iterators or streams. These filters are meant to be used with, respectively, `Iterator::filter_map` and `StreamExt::filter_map`. The two operations are conceptually the same - they pass an item from the underlying sequence to a function that returns an option, drops the values for which the function returns `None`, and yields the value inside of `Some` in the resulting sequence. However, `Iterator::filter_map` takes a function from the iterator elements to `Option`. `StreamExt::filter_map` takes a function from the iterator elements to _a `Future` whose output is `Option`_. As such, you can't easily use functions designed for one use case, for the other. You need an adapter - conventionally, `futures::ready`, if you have a non-async function and need an async one. This provides two sets of sequence filters: * `crate::test::fixtures::event` contains functions which return `Option` directly, and which are intended for use with `Iterator::filter_map`. * `crate::test::fixtures::event::stream` contains lifted versions that return a `Future`, and which are intended for use with `StreamExt::filter_map`. The lifting is done purely manually. I spent a lot of time writing clever-er versions before deciding on this; those were fun to write, but hell to read and not meaningfully better, and this is test support code, so we want it to be dumb and obvious. Complexity for the sake of intellectual satisfaction is a huge antifeature in this context. --- src/event/handlers/stream/test/message.rs | 36 +++++++++++++++---------------- 1 file changed, 18 insertions(+), 18 deletions(-) (limited to 'src/event/handlers/stream/test/message.rs') diff --git a/src/event/handlers/stream/test/message.rs b/src/event/handlers/stream/test/message.rs index a80c896..4369996 100644 --- a/src/event/handlers/stream/test/message.rs +++ b/src/event/handlers/stream/test/message.rs @@ -44,8 +44,8 @@ async fn sending() { // Verify that an event is delivered let _ = events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::sent) + .filter_map(fixtures::event::stream::message) + .filter_map(fixtures::event::stream::message::sent) .filter(|event| future::ready(event.message == message)) .next() .expect_some("delivered message sent event") @@ -89,8 +89,8 @@ async fn previously_sent() { // Verify that an event is delivered let _ = events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::sent) + .filter_map(fixtures::event::stream::message) + .filter_map(fixtures::event::stream::message::sent) .filter(|event| future::ready(event.message == message)) .next() .expect_some("delivered message sent event") @@ -135,8 +135,8 @@ async fn sent_in_multiple_channels() { // Verify the structure of the response. let events = events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::sent) + .filter_map(fixtures::event::stream::message) + .filter_map(fixtures::event::stream::message::sent) .take(messages.len()) .collect::>() .expect_ready("events ready") @@ -177,8 +177,8 @@ async fn sent_sequentially() { // Verify the expected events in the expected order let mut events = events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::sent) + .filter_map(fixtures::event::stream::message) + .filter_map(fixtures::event::stream::message::sent) .filter(|event| future::ready(messages.iter().any(|message| &event.message == message))); for message in &messages { @@ -222,8 +222,8 @@ async fn expiring() { // Check for expiry event let _ = events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::deleted) + .filter_map(fixtures::event::stream::message) + .filter_map(fixtures::event::stream::message::deleted) .filter(|event| future::ready(event.id == message.id)) .next() .expect_some("a deleted message event will be delivered") @@ -261,8 +261,8 @@ async fn previously_expired() { // Check for expiry event let _ = events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::deleted) + .filter_map(fixtures::event::stream::message) + .filter_map(fixtures::event::stream::message::deleted) .filter(|event| future::ready(event.id == message.id)) .next() .expect_some("a deleted message event will be delivered") @@ -300,8 +300,8 @@ async fn deleting() { // Check for delete event let _ = events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::deleted) + .filter_map(fixtures::event::stream::message) + .filter_map(fixtures::event::stream::message::deleted) .filter(|event| future::ready(event.id == message.id)) .next() .expect_some("a deleted message event will be delivered") @@ -339,8 +339,8 @@ async fn previously_deleted() { // Check for delete event let _ = events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::deleted) + .filter_map(fixtures::event::stream::message) + .filter_map(fixtures::event::stream::message::deleted) .filter(|event| future::ready(event.id == message.id)) .next() .expect_some("a deleted message event will be delivered") @@ -384,8 +384,8 @@ async fn previously_purged() { // Check for delete event events - .filter_map(fixtures::event::message) - .filter_map(fixtures::event::message::deleted) + .filter_map(fixtures::event::stream::message) + .filter_map(fixtures::event::stream::message::deleted) .filter(|event| future::ready(event.id == message.id)) .next() .expect_wait("no deleted message will be delivered") -- cgit v1.2.3