顺序执行期货集合

Jaa*_*rus 4 future rust

我有一个期货集合,我想合并为一个单一的期货,以使它们按顺序执行。

我调查了该futures_ordered功能。它似乎按顺序返回结果,但期货同时执行。

我尝试fold将期货与结合使用and_then。但是,这对于类型系统来说是棘手的。

let tasks = vec![ok(()), ok(()), ok(())];

let combined_task = tasks.into_iter().fold(
    ok(()),                             // seed
    |acc, task| acc.and_then(|_| task), // accumulator
);
Run Code Online (Sandbox Code Playgroud)

操场

这给出了以下错误:

let tasks = vec![ok(()), ok(()), ok(())];

let combined_task = tasks.into_iter().fold(
    ok(()),                             // seed
    |acc, task| acc.and_then(|_| task), // accumulator
);
Run Code Online (Sandbox Code Playgroud)

我可能正在解决这个错误,但是我的想法已经用完了。

Luk*_*oid 5

Stream具有buffered允许您限制同时轮询的期货数量的功能。

如果您有一组期货,您可以创建一个流并buffered像这样使用:

let tasks = vec![future1, future2];
let stream = ::futures::stream::iter_ok(tasks);
let mut when_result_ready = stream.buffered(1);
Run Code Online (Sandbox Code Playgroud)

when_result_ready现在将是一个Stream实现,它一次只轮询一个未来,并在每个未来完成后移动到下一个。

更新

根据评论和分析,它似乎buffered有很大的开销,所以另一种解决方案是将每个转换Future为 aStreamflatten它们:

iter_ok(tasks).map(|f|f.into_stream()).flatten()
Run Code Online (Sandbox Code Playgroud)

flatten指出“每个单独的流在进入下一个流之前都会耗尽。” 意味着Future在前一个完成之前不会轮询。在我的本地分析中,这似乎比buffered方法快 80% 。


我上面的两个答案都会产生一个Stream结果,其中每个源Future都按顺序轮询并返回结果。提问者实际要求的只是最后一个Future,而不是每个来源的结果Future,如果是这种情况,Stefan 的答案可能更有用,并且证明具有更好的性能。

  • 我认为“buffered”的开销很高(每次轮询内部未来时,中间应该有一些“Arc”对象传递给“with_notify”)。拳击可能比“buffered(1)”便宜。 (2认同)

Ste*_*fan 5

结合iter_okStream::for_each

use futures::Stream;
use futures::future::ok;
use futures::stream::iter_ok;

let tasks = vec![ok(()), ok(()), ok(())];

let combined_task = iter_ok::<_, ()>(tasks).for_each(|f| f);
Run Code Online (Sandbox Code Playgroud)

iter_ok产生传递的项目流,并且从不抛出错误(这就是为什么有时需要修复错误类型的原因)。传递给的闭包for_each然后Future为每个项目返回一个运行-在这里只是传递的项目。

for_each然后驱动每个返回的将来完成,然后再按您的意愿移至下一个。它还会因遇到的第一个错误而中止,并且需要内部期货才能()成功返回。

for_each本身会返回Future失败(如上文所述)或()完成后返回的。

use futures::Stream;
use futures::future::ok;
use futures::stream::iter_ok;

let tasks = vec![ok(()), ok(()), ok(())];

let combined_task = iter_ok::<_, ()>(tasks).for_each(|f| f);
Run Code Online (Sandbox Code Playgroud)