Tim*_*sée 2 concurrency rust async-await
我正在尝试在 Rust async-await(即将稳定)中同时(而不是按顺序)运行期货列表,直到它们中的任何一个解析为true.
想象一下,有一个Vec<File>, 和一个为每个文件运行的 future 产生一个bool(可能是无序的)。这将是一个简单的顺序实现。
async fn my_function(files: Vec<File>) -> bool {
// Run the future on each file, return early if we received true
for file in files {
if long_future(file).await {
return true;
}
}
false
}
async fn long_future(file: File) -> bool {
// Some long-running task here...
}
Run Code Online (Sandbox Code Playgroud)
这有效,但我想同时运行其中一些期货以加快进程。我遇到了buffer_unordered()(on Stream),但无法弄清楚如何实现这一点。
据我了解join,鉴于您提供了一个多线程池,也可以使用类似的东西来同时运行期货。但我不知道如何在这里有效地使用它。
我尝试了这样的事情,但无法让它工作:
let any_true = futures::stream::iter(files)
.buffer_unordered(4) // Run up to 4 concurrently
.map(|file| long_future(file).await)
.filter(|stop| stop) // Only propagate true values
.next() // Return early on first true
.is_some();
Run Code Online (Sandbox Code Playgroud)
除此之外,我正在寻找类似于any迭代器中使用的东西,以替换 if 语句或filter().next().is_some()组合。
我该怎么办?
我认为你应该可以使用select_ok,正如 Some Guy 所提到的。一个例子,我用一堆替换了文件u32来说明:
use futures::future::FutureExt;
async fn long_future(file: u32) -> bool {
true
}
async fn handle_file(file: u32) -> Result<(), ()> {
let should_stop = long_future(file).await;
// Would be better if there were something more descriptive here
if should_stop {
Ok(())
} else {
Err(())
}
}
async fn tims_answer(files: Vec<u32>) -> bool {
let waits = files.into_iter().map(|f| handle_file(f).boxed());
let any_true = futures::future::select_ok(waits).await.is_ok();
any_true
}
Run Code Online (Sandbox Code Playgroud)