是否有一种 FuturesOrdered 替代方案可以一一产生结果?

kub*_*tto 0 future stream rust async-await rust-tokio

在 Rust 中,我有一堆想要并行执行的异步函数。处理这些函数结果的顺序很重要。我还想在这些函数可用时检索它们的结果。

\n

让我不好解释一下。

\n

以下是 的描述FuturesOrdered

\n
\n

这个“组合器”类似于 FuturesUnordered,但它在 future 集合之上强加了一个顺序。虽然集合中的 future 将并行完成\n,但结果将仅按其原始 future 添加到队列的顺序返回。

\n
\n

到目前为止,一切都很好。现在看这个例子:

\n
let mut ft = FuturesOrdered::new();\nft.push(wait_n(1)); // wait_n sleeps\nft.push(wait_n(2)); // for the given\nft.push(wait_n(4)); // number of secs\nft.push(wait_n(3));\nft.push(wait_n(5));\nlet r = ft.collect::<Vec<u64>>().await;\n
Run Code Online (Sandbox Code Playgroud)\n

由于FuturesOrdered\xc2\xa0await 直到所有future 都完成;这就是我得到的:

\n
|--|        ++\n|----|      ++\n|--------|  ++\n|------|    ++\n|----------|++\n            ++-> all results available here \n
Run Code Online (Sandbox Code Playgroud)\n

这就是我要的:

\n
|--|++\n|----|++\n|--------|++\n|------|    ++\n|----------|  ++\n               \n
Run Code Online (Sandbox Code Playgroud)\n

换句话说; 我要等待下一个未来;随着剩余的期货不断地完成。另请注意,即使任务#4 在任务#3 之前完成;由于最初的订单,它是在#3 之后处理的。

\n

我怎样才能获得像这样同时执行的期货流?我希望有这样的事情:

\n
|--|        ++\n|----|      ++\n|--------|  ++\n|------|    ++\n|----------|++\n            ++-> all results available here \n
Run Code Online (Sandbox Code Playgroud)\n

Mas*_*inn 9

由于 FuturesOrdered 会等待所有 future 完成

它本质上并不这样做。

您要求它这样做是因为您要collect执行Vec. 由于整个要点是将整个流StreamExt::collect转换为集合:

将流转换为集合,返回代表该计算结果的 future。返回的 future 将在流终止时得到解决。

只有在所有期货结算后,它才能产生集合。

如果您延迟访问流,它将在可用时生成项目:

let mut s = stream::FuturesOrdered::new();
s.push(future::lazy(|_| 1).boxed());
s.push(future::lazy(|_| panic!("never resolves")).boxed());

let f = s.next().await;
println!("{:?}", f);
Run Code Online (Sandbox Code Playgroud)

工作得很好,尽管第二个未来不可能解决。如果你尝试collect这样做,它会恐慌。

我怎样才能获得像这样同时执行的期货流?我希望有这样的事情:

就这样吗?

let mut s = stream::FuturesOrdered::new();
s.push(sleep(Duration::from_millis(100)));
s.push(sleep(Duration::from_millis(200)));
s.push(sleep(Duration::from_millis(400)));
s.push(sleep(Duration::from_millis(300)));
s.push(sleep(Duration::from_millis(500)));

let start = Instant::now();
while s.next().await.is_some() {
    println!("{:.2?}", Instant::now() - start);
}
Run Code Online (Sandbox Code Playgroud)
101.49ms
200.98ms
400.94ms
400.96ms
501.40ms
Run Code Online (Sandbox Code Playgroud)

(使用毫秒睡眠是因为多秒睡眠往往会导致游乐场超时)