我有一个期货集合,我想合并为一个单一的期货,以使它们按顺序执行。
我调查了该futures_ordered功能。它似乎按顺序返回结果,但期货同时执行。
我尝试fold将期货与结合使用and_then。但是,这对于类型系统来说是棘手的。
let tasks = vec![ok(()), ok(()), ok(())];
let combined_task = tasks.into_iter().fold(
ok(()), // seed
|acc, task| acc.and_then(|_| task), // accumulator
);
Run Code Online (Sandbox Code Playgroud)
这给出了以下错误:
let tasks = vec![ok(()), ok(()), ok(())];
let combined_task = tasks.into_iter().fold(
ok(()), // seed
|acc, task| acc.and_then(|_| task), // accumulator
);
Run Code Online (Sandbox Code Playgroud)
我可能正在解决这个错误,但是我的想法已经用完了。
Stream具有buffered允许您限制同时轮询的期货数量的功能。
如果您有一组期货,您可以创建一个流并buffered像这样使用:
let tasks = vec![future1, future2];
let stream = ::futures::stream::iter_ok(tasks);
let mut when_result_ready = stream.buffered(1);
Run Code Online (Sandbox Code Playgroud)
when_result_ready现在将是一个Stream实现,它一次只轮询一个未来,并在每个未来完成后移动到下一个。
更新
根据评论和分析,它似乎buffered有很大的开销,所以另一种解决方案是将每个转换Future为 aStream和flatten它们:
iter_ok(tasks).map(|f|f.into_stream()).flatten()
Run Code Online (Sandbox Code Playgroud)
flatten指出“每个单独的流在进入下一个流之前都会耗尽。” 意味着Future在前一个完成之前不会轮询。在我的本地分析中,这似乎比buffered方法快 80% 。
我上面的两个答案都会产生一个Stream结果,其中每个源Future都按顺序轮询并返回结果。提问者实际要求的只是最后一个Future,而不是每个来源的结果Future,如果是这种情况,Stefan 的答案可能更有用,并且证明具有更好的性能。
use futures::Stream;
use futures::future::ok;
use futures::stream::iter_ok;
let tasks = vec![ok(()), ok(()), ok(())];
let combined_task = iter_ok::<_, ()>(tasks).for_each(|f| f);
Run Code Online (Sandbox Code Playgroud)
iter_ok产生传递的项目流,并且从不抛出错误(这就是为什么有时需要修复错误类型的原因)。传递给的闭包for_each然后Future为每个项目返回一个运行-在这里只是传递的项目。
for_each然后驱动每个返回的将来完成,然后再按您的意愿移至下一个。它还会因遇到的第一个错误而中止,并且需要内部期货才能()成功返回。
for_each本身会返回Future失败(如上文所述)或()完成后返回的。
use futures::Stream;
use futures::future::ok;
use futures::stream::iter_ok;
let tasks = vec![ok(()), ok(()), ok(())];
let combined_task = iter_ok::<_, ()>(tasks).for_each(|f| f);
Run Code Online (Sandbox Code Playgroud)