列出<Future>到Future <List>序列

Jat*_*tin 67 java concurrency java-8 completable-future

我想转换List<CompletableFuture<X>>CompletableFuture<List<T>>.这非常有用,因为当您有许多异步任务并且需要获得所有异步任务的结果时.

如果其中任何一个失败,则最终的未来将失败.这就是我实施的方式:

  public static <T> CompletableFuture<List<T>> sequence2(List<CompletableFuture<T>> com, ExecutorService exec) {
        if(com.isEmpty()){
            throw new IllegalArgumentException();
        }
        Stream<? extends CompletableFuture<T>> stream = com.stream();
        CompletableFuture<List<T>> init = CompletableFuture.completedFuture(new ArrayList<T>());
        return stream.reduce(init, (ls, fut) -> ls.thenComposeAsync(x -> fut.thenApplyAsync(y -> {
            x.add(y);
            return x;
        },exec),exec), (a, b) -> a.thenCombineAsync(b,(ls1,ls2)-> {
            ls1.addAll(ls2);
            return ls1;
        },exec));
    }
Run Code Online (Sandbox Code Playgroud)

要运行它:

ExecutorService executorService = Executors.newCachedThreadPool();
        Stream<CompletableFuture<Integer>> que = IntStream.range(0,100000).boxed().map(x -> CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep((long) (Math.random() * 10));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return x;
        }, executorService));
CompletableFuture<List<Integer>> sequence = sequence2(que.collect(Collectors.toList()), executorService);
Run Code Online (Sandbox Code Playgroud)

如果其中任何一个失败则失败.即使有一百万个期货,它也能按预期产出.我遇到的问题是:如果有超过5000个期货,如果其中任何一个失败,我得到一个StackOverflowError:

java.util.concurrent.CompletableFuture中的java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210)中的线程"pool-1-thread-2611"java.lang.StackOverflowError中的异常$ ThenCompose.run(CompletableFuture.java) :1487)java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:193)at java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210)at java.util.concurrent.CompletableFuture $ ThenCompose.run( CompletableFuture.java:1487)

我做错了什么?

注意:当任何未来失败时,上述返回的未来将失败.接受的答案也应该采取这一点.

Mis*_*sha 82

用途CompletableFuture.allOf(...):

static<T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> com) {
    return CompletableFuture.allOf(com.toArray(new CompletableFuture<?>[0]))
            .thenApply(v -> com.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList())
            );
}
Run Code Online (Sandbox Code Playgroud)

关于您的实施的一些评论:

您使用的.thenComposeAsync,.thenApplyAsync并且.thenCombineAsync很可能没有做你的期望.这些...Async方法在单独的线程中运行提供给它们的函数.因此,在您的情况下,您正在将新项添加到列表中以在提供的执行程序中运行.没有必要将轻量级操作填充到缓存的线程执行程序中.thenXXXXAsync没有充分理由不要使用方法.

此外,reduce不应该用于累积到可变容器中.即使流顺序时它可能正常工作,但如果要使流并行,它将失败.要执行可变缩减,请.collect改为使用.

如果要在第一次失败后立即异常完成整个计算,请在sequence方法中执行以下操作:

CompletableFuture<List<T>> result = CompletableFuture.allOf(com.toArray(new CompletableFuture<?>[0]))
        .thenApply(v -> com.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList())
        );

com.forEach(f -> f.whenComplete((t, ex) -> {
    if (ex != null) {
        result.completeExceptionally(ex);
    }
}));

return result;
Run Code Online (Sandbox Code Playgroud)

另外,如果要在第一次失败时取消剩余的操作,请在exec.shutdownNow();之后添加result.completeExceptionally(ex);.当然,这假设exec只存在于这一个计算中.如果没有,你将不得不循环并取消每个剩余的Future单独.

  • @AbhijitSarkar 这些任务不是由“join”调用的。使用 allOf 的好处是,当 allOf 触发时,所有任务都已完成,而 join 只是获取结果。 (2认同)

Hol*_*ger 11

正如Misha指出的那样,你过度使用…Async操作.此外,您正在编写一个复杂的操作链,用于建立依赖关系,这不依赖于您的程序逻辑:

  • 你创建一个作业x,这取决于你的列表的第一个和第二个作业
  • 你创建一个x + 1的工作,这取决于工作x和你的清单的第三个工作
  • 你创建一个x + 2的工作,这取决于工作x + 1和你的清单的第四个工作
  • ...
  • 你创建一个x + 5000的工作,这取决于工作x + 4999和你的列表的最后一个工作

然后,可以递归地执行取消(显式地或由于异常)这个递归组合的作业,并且可能会失败StackOverflowError.这是依赖于实现的.

正如Misha已经显示的那样,有一种方法allOf可以让你模拟你的初衷,定义一个取决于你列表中所有工作的工作.

但是,值得注意的是,即使这样也没有必要.由于您使用的是无界线程池执行程序,因此您只需将收集结果的异步作业发布到列表中即可完成.无论如何,通过询问每项工作的结果来暗示等待完成.

ExecutorService executorService = Executors.newCachedThreadPool();
List<CompletableFuture<Integer>> que = IntStream.range(0, 100000)
  .mapToObj(x -> CompletableFuture.supplyAsync(() -> {
    LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos((long)(Math.random()*10)));
    return x;
}, executorService)).collect(Collectors.toList());
CompletableFuture<List<Integer>> sequence = CompletableFuture.supplyAsync(
    () -> que.stream().map(CompletableFuture::join).collect(Collectors.toList()),
    executorService);
Run Code Online (Sandbox Code Playgroud)

当线程数量有限且作业可能产生额外的异步作业时,使用编写相关操作的方法很重要,以避免等待作业从必须首先完成的作业中窃取线程,但这里也不是这种情况.

在这种特定情况下,一个作业简单地迭代大量的先决条件作业并且在必要时等待可能比对大量依赖性进行建模并使每个作业通知从属作业有关完成更有效.

  • 需要注意的是,使用`supplyAsync`而不是`allOf`将使用池中的线程来等待所有任务的完成.如果我没有弄错的话,`allOf`将在分配给各个任务的线程内运行.对于大多数用例来说并不是什么大问题,但值得注意. (2认同)

osk*_*vli 7

您可以获得Spotify的CompletableFutures库和使用allAsList方法.我认为它的灵感来自Guava的Futures.allAsList方法.

public static <T> CompletableFuture<List<T>> allAsList(
    List<? extends CompletionStage<? extends T>> stages) {
Run Code Online (Sandbox Code Playgroud)

如果您不想使用库,这是一个简单的实现:

public <T> CompletableFuture<List<T>> allAsList(final List<CompletableFuture<T>> futures) {
    return CompletableFuture.allOf(
        futures.toArray(new CompletableFuture[futures.size()])
    ).thenApply(ignored ->
        futures.stream().map(CompletableFuture::join).collect(Collectors.toList())
    );
}
Run Code Online (Sandbox Code Playgroud)


Joh*_*ean 5

在 CompletableFuture 上使用 thenCombine 的示例序列操作

public<T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> com){

    CompletableFuture<List<T>> identity = CompletableFuture.completedFuture(new ArrayList<T>());

    BiFunction<CompletableFuture<List<T>>,CompletableFuture<T>,CompletableFuture<List<T>>> combineToList = 
            (acc,next) -> acc.thenCombine(next,(a,b) -> { a.add(b); return a;});

    BinaryOperator<CompletableFuture<List<T>>> combineLists = (a,b)-> a.thenCombine(b,(l1,l2)-> { l1.addAll(l2); return l1;}) ;  

    return com.stream()
              .reduce(identity,
                      combineToList,
                      combineLists);  

   }
} 
Run Code Online (Sandbox Code Playgroud)

如果您不介意使用第 3 方库cyclops-react(我是作者)有一组用于 CompletableFutures(以及 Optionals、Streams 等)的实用方法

  List<CompletableFuture<String>> listOfFutures;

  CompletableFuture<ListX<String>> sequence =CompletableFutures.sequence(listOfFutures);
Run Code Online (Sandbox Code Playgroud)


Jat*_*tin 5

为了加上@Misha 接受的答案,它可以进一步扩展为收集器:

 public static <T> Collector<CompletableFuture<T>, ?, CompletableFuture<List<T>>> sequenceCollector() {
    return Collectors.collectingAndThen(Collectors.toList(), com -> sequence(com));
}
Run Code Online (Sandbox Code Playgroud)

现在你可以:

Stream<CompletableFuture<Integer>> stream = Stream.of(
    CompletableFuture.completedFuture(1),
    CompletableFuture.completedFuture(2),
    CompletableFuture.completedFuture(3)
);
CompletableFuture<List<Integer>> ans = stream.collect(sequenceCollector());
Run Code Online (Sandbox Code Playgroud)