默认的ForkJoinPool执行器需要很长时间

Kay*_*ayV 6 executorservice java-8 threadpoolexecutor forkjoinpool completable-future

我正在使用CompletableFuture来异步执行从列表源生成的流.

所以我正在测试重载方法,即CompletableFuture的"supplyAsync",其中一个方法只接受单个供应商参数,另一个方法接受供应商参数和执行者参数.以下是两者的文档:

supplyAsync(供应商供应商)

返回由ForkJoinPool.commonPool()中运行的任务异步完成的新CompletableFuture,其中包含通过调用给定供应商获得的值.

第二

supplyAsync(供应商供应商,执行执行人)

返回由给定执行程序中运行的任务异步完成的新CompletableFuture,其中包含通过调用给定供应商获得的值.

这是我的测试类:

public class TestCompleteableAndParallelStream {

    public static void main(String[] args) {
        List<MyTask> tasks = IntStream.range(0, 10)
                .mapToObj(i -> new MyTask(1))
                .collect(Collectors.toList());

        useCompletableFuture(tasks);

        useCompletableFutureWithExecutor(tasks);

    }

    public static void useCompletableFutureWithExecutor(List<MyTask> tasks) {
          long start = System.nanoTime();
          ExecutorService executor = Executors.newFixedThreadPool(Math.min(tasks.size(), 10));
          List<CompletableFuture<Integer>> futures =
              tasks.stream()
                   .map(t -> CompletableFuture.supplyAsync(() -> t.calculate(), executor))
                   .collect(Collectors.toList());

          List<Integer> result =
              futures.stream()
                     .map(CompletableFuture::join)
                     .collect(Collectors.toList());
          long duration = (System.nanoTime() - start) / 1_000_000;
          System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration);
          System.out.println(result);
          executor.shutdown();
        }

    public static void useCompletableFuture(List<MyTask> tasks) {
          long start = System.nanoTime();
          List<CompletableFuture<Integer>> futures =
              tasks.stream()
                   .map(t -> CompletableFuture.supplyAsync(() -> t.calculate()))
                   .collect(Collectors.toList());

          List<Integer> result =
              futures.stream()
                     .map(CompletableFuture::join)
                     .collect(Collectors.toList());
          long duration = (System.nanoTime() - start) / 1_000_000;
          System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration);
          System.out.println(result);
        }



}


class MyTask {
      private final int duration;
      public MyTask(int duration) {
        this.duration = duration;
      }
      public int calculate() {
        System.out.println(Thread.currentThread().getName());
        try {
          Thread.sleep(duration * 1000);
        } catch (final InterruptedException e) {
          throw new RuntimeException(e);
        }
        return duration;
      }
    }
Run Code Online (Sandbox Code Playgroud)

虽然"useCompletableFuture"方法需要大约4秒才能完成,"useCompletableFutureWithExecutor"方法只需1秒即可完成.

不,我的问题是,ForkJoinPool.commonPool()可以做什么不同的处理开销?在那应该不是我们总是喜欢自定义执行器池而不是ForkJoinPool?

Szy*_*iak 7

检查ForkJoinPool.commonPool()尺寸。默认情况下,它会创建一个大小为

Runtime.getRuntime().availableProcessors() - 1
Run Code Online (Sandbox Code Playgroud)

我在我的 Intel i7-4800MQ(4 个内核 + 4 个虚拟内核)上运行你的例子,在我的例子中公共池的大小是7,所以整个计算花费了大约 2000 毫秒:

ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-4
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-6
ForkJoinPool.commonPool-worker-5
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-7
ForkJoinPool.commonPool-worker-4
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-1
Processed 10 tasks in 2005 millis
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
Run Code Online (Sandbox Code Playgroud)

在第二种情况下,你使用

Executors.newFixedThreadPool(Math.min(tasks.size(), 10));
Run Code Online (Sandbox Code Playgroud)

所以池中有 10 个线程准备执行计算,所以所有任务都在大约 1000 毫秒内运行:

pool-1-thread-1
pool-1-thread-2
pool-1-thread-3
pool-1-thread-4
pool-1-thread-5
pool-1-thread-6
pool-1-thread-7
pool-1-thread-8
pool-1-thread-9
pool-1-thread-10
Processed 10 tasks in 1002 millis
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
Run Code Online (Sandbox Code Playgroud)

ForkJoinPool和之间的区别ExecutorService

尤金在他的评论中还提到了一件更重要的事情。ForkJoinPool使用工作窃取方法:

一个ForkJoinPool不同于其他类型的ExecutorService的主要凭借用人窃取的:所有的线程池中试图找到并执行其他活动任务提交到池和/或创建的任务(最终阻塞等待工作,如果不存在) . 当大多数任务产生其他子任务时(大多数 ForkJoinTasks 也是如此),以及当许多小任务从外部客户端提交到池时,这可以实现高效处理。特别是在构造函数中将 asyncMode 设置为 true 时,ForkJoinPools 也可能适用于从未加入的事件样式任务。

ExecutorService创建.newFixedThreadPool()使用分而治之的方法。

如何确定池大小?

有一个关于什么是最佳线程池大小的问题,您可能会在那里找到有用的信息:

设置线程池的理想大小

此外,这个线程是一个调查的好地方:

Java 8 并行流中的自定义线程池

  • @SzymonStepniak 您实际上需要很好地预热 VM 才能做出关于速度的任何 *理智* 结论,而此代码则没有。拥有*更多*线程然后实际CPU(甚至虚拟)是不好的。 (2认同)
  • @SzymonStepniak 同样,这些池如何工作的基本思想是完全不同的——一个划分工作(分而治之),另一个进行窃取——完全不同的实现 (2认同)
  • 必须强调的是,在这个人为的示例中,使用比内核数量更多的线程有很大帮​​助,其中作业由“Thread.sleep”组成。对于实际任务,公共池的默认并行度可能更合理。工作窃取在这里无关紧要,因为最终,工作线程在任何一种情况下都只处理排队的任务。两者都不在这里使用“分而治之的方法”。 (2认同)
  • @Eugene:我建议你重新阅读那里的帖子。首先,“偷工减料”和“分而治之”并不是矛盾的东西,其实两者通常都归结于F/J框架。由于其他线程池执行程序,例如通过 `newFixedThreadPool()` 构造的线程池执行程序只有一个队列,因此它们总是执行某种“工作窃取”,尽管这样称呼它没有多大意义,因为您需要本地队列将其称为实际窃取。另一方面,“分而治之”是一种解决问题的策略,您可以在任何执行者之上实施,但最好是在 F/J 之上。 (2认同)

Kay*_*ayV 5

进一步检查互联网上的解决方案,我发现我们可以使用以下属性更改 ForkJoinPool 采用的默认池大小:

-Djava.util.concurrent.ForkJoinPool.common.parallelism=16
Run Code Online (Sandbox Code Playgroud)

因此,此属性可以进一步帮助以更有效的方式和更多的并行性使用 ForkJoinPool。