如何加入向量中的所有期货而不会像 join_all 那样取消失败?

stu*_*stu 5 asynchronous rust async-await

Vec通过调用async函数创建了一个期货。将所有期货添加到向量后,我想等待整个集合,获取结果列表或每个完成的回调。

我可以简单地循环或迭代期货向量并调用.await每个期货,这将允许我正确处理错误而不是futures::future::join_all取消其他错误,但我确信有一种更惯用的方法来完成这项任务。

我还希望能够在期货完成时处理它们,因此如果我从前几个期货中获得足够的信息,我可以取消剩余的未完成期货,而不是等待它们并丢弃它们的结果,无论是否出错。如果我按顺序迭代向量,这是不可能的。

我正在寻找的是一个回调(关闭等),它让我在结果进来时累积结果,以便我可以适当地处理错误或取消剩余的期货(从回调中),如果我确定我不这样做'不需要剩下的。

我可以看出这让借用检查器很头疼:试图Vec在异步引擎的回调中修改未来。

有许多 Stack Overflow 问题和 Reddit 帖子解释了如何join_all加入期货列表,但如果失败则取消其余的,以及异步引擎如何产生线程,或者它们可能不会产生,或者如果它们产生它们是糟糕的设计。

She*_*ter 13

使用futures::select_all

use futures::future; // 0.3.4

type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
type Result<T, E = Error> = std::result::Result<T, E>;

async fn might_fail(fails: bool) -> Result<i32> {
    if fails {
        Ok(42)
    } else {
        Err("boom".into())
    }
}

async fn many() -> Result<i32> {
    let raw_futs = vec![might_fail(true), might_fail(false), might_fail(true)];
    let unpin_futs: Vec<_> = raw_futs.into_iter().map(Box::pin).collect();
    let mut futs = unpin_futs;

    let mut sum = 0;

    while !futs.is_empty() {
        match future::select_all(futs).await {
            (Ok(val), _index, remaining) => {
                sum += val;
                futs = remaining;
            }
            (Err(_e), _index, remaining) => {
                // Ignoring all errors
                futs = remaining;
            }
        }

        if sum > 42 {
            // Early exit
            return Ok(sum);
        }
    }

    Ok(sum)
}
Run Code Online (Sandbox Code Playgroud)

这将轮询集合中的所有 future,返回第一个未挂起的 future、其索引以及剩余的挂起或未轮询的 future。然后,您可以匹配Result并处理成功或失败的情况。

select_all在循环内部调用。这使您能够提前退出该功能。当您退出时,futs向量将被删除,并且删除 future 会取消它。