Rust async-await:检查列表中的任何未来是否同时解析为真?

Tim*_*sée 2 concurrency rust async-await

我正在尝试在 Rust async-await(即将稳定)中同时(而不是按顺序)运行期货列表,直到它们中的任何一个解析为true.

想象一下,有一个Vec<File>, 和一个为每个文件运行的 future 产生一个bool(可能是无序的)。这将是一个简单的顺序实现。

async fn my_function(files: Vec<File>) -> bool {
    // Run the future on each file, return early if we received true
    for file in files {
        if long_future(file).await {
            return true;
        }
    }

    false
}

async fn long_future(file: File) -> bool {
    // Some long-running task here...
}
Run Code Online (Sandbox Code Playgroud)

这有效,但我想同时运行其中一些期货以加快进程。我遇到了buffer_unordered()(on Stream),但无法弄清楚如何实现这一点。

据我了解join,鉴于您提供了一个多线程池,也可以使用类似的东西来同时运行期货。但我不知道如何在这里有效地使用它。

我尝试了这样的事情,但无法让它工作:

let any_true = futures::stream::iter(files)
    .buffer_unordered(4) // Run up to 4 concurrently
    .map(|file| long_future(file).await)
    .filter(|stop| stop) // Only propagate true values
    .next() // Return early on first true
    .is_some();
Run Code Online (Sandbox Code Playgroud)

除此之外,我正在寻找类似于any迭代器中使用的东西,以替换 if 语句或filter().next().is_some()组合。

我该怎么办?

Mar*_*iep 5

我认为你应该可以使用select_ok,正如 Some Guy 所提到的。一个例子,我用一堆替换了文件u32来说明:

use futures::future::FutureExt;

async fn long_future(file: u32) -> bool {
    true
}

async fn handle_file(file: u32) -> Result<(), ()> {
    let should_stop = long_future(file).await;
    // Would be better if there were something more descriptive here
    if should_stop {
        Ok(())
    } else {
        Err(())
    }
}

async fn tims_answer(files: Vec<u32>) -> bool {
    let waits = files.into_iter().map(|f| handle_file(f).boxed());

    let any_true = futures::future::select_ok(waits).await.is_ok();

    any_true
}
Run Code Online (Sandbox Code Playgroud)

  • 非常感谢!我想明确指出 [`boxed()`](https://docs.rs/futures-preview/0.3.0-alpha.19/futures/future/trait.FutureExt.html#method.boxed)用于 [`Pin`](https://doc.rust-lang.org/std/pin/index.html) 值,按照 `select_ok` 的要求。 (2认同)