use std::{future::IntoFuture, time::Duration}; use futures::{stream, Stream}; use tokio::time::timeout; async fn immediately(fut: F) -> F::Output where F: IntoFuture, { // I haven't been particularly rigorous here. Zero delay _seems to work_, // but this can be set higher; it makes tests that fail to meet the // "immediate" expectation take longer, but gives slow tests time to // succeed, as well. let duration = Duration::from_nanos(0); timeout(duration, fut) .await .expect("expected result immediately") } // This is only intended for streams, since their `next()`, `collect()`, and // so on can all block indefinitely on an empty stream. There's no need to // force immediacy on futures that "can't" block forever, and it can hide logic // errors if you do that. // // The impls below _could_ be replaced with a blanket impl for all future // types, otherwise. The choice to restrict impls to stream futures is // deliberate. pub trait Immediately { type Output; async fn immediately(self) -> Self::Output; } impl<'a, St> Immediately for stream::Next<'a, St> where St: Stream + Unpin + ?Sized, { type Output = Option<::Item>; async fn immediately(self) -> Self::Output { immediately(self).await } } impl Immediately for stream::Collect where St: Stream, C: Default + Extend<::Item>, { type Output = C; async fn immediately(self) -> Self::Output { immediately(self).await } }