Java 8并行流中的自定义线程池

Luk*_*kas 371 java parallel-processing concurrency java-8 java-stream

是否可以为Java 8 并行流指定自定义线程池?我找不到任何地方.

想象一下,我有一个服务器应用程序,我想使用并行流.但是应用程序很大且是多线程的,因此我想将它划分为区分.我不想在另一个模块的应用程序块任务的一个模块中执行缓慢的任务.

如果我不能为不同的模块使用不同的线程池,这意味着我无法在大多数现实情况下安全地使用并行流.

请尝试以下示例.在单独的线程中执行一些CPU密集型任务.这些任务利用并行流.第一个任务被破坏,因此每个步骤需要1秒(通过线程休眠模拟).问题是其他线程卡住并等待损坏的任务完成.这是一个人为的例子,但想象一下servlet应用程序和有人向共享fork连接池提交长时间运行的任务.

public class ParallelTest {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = Executors.newCachedThreadPool();

        es.execute(() -> runTask(1000)); //incorrect task
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));


        es.shutdown();
        es.awaitTermination(60, TimeUnit.SECONDS);
    }

    private static void runTask(int delay) {
        range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max()
                .ifPresent(max -> System.out.println(Thread.currentThread() + " " + max));
    }

    public static boolean isPrime(long n) {
        return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0);
    }
}
Run Code Online (Sandbox Code Playgroud)

Luk*_*kas 364

实际上有一个技巧如何在特定的fork-join池中执行并行操作.如果将其作为fork-join池中的任务执行,它将保留在那里并且不使用公共任务.

final int parallelism = 4;
ForkJoinPool forkJoinPool = null;
try {
    forkJoinPool = new ForkJoinPool(parallelism);
    final List<Integer> primes = forkJoinPool.submit(() ->
        // Parallel task here, for example
        IntStream.range(1, 1_000_000).parallel()
                .filter(PrimesPrint::isPrime)
                .boxed().collect(Collectors.toList())
    ).get();
    System.out.println(primes);
} catch (InterruptedException | ExecutionException e) {
    throw new RuntimeException(e);
} finally {
    if (forkJoinPool != null) {
        forkJoinPool.shutdown();
    }
}
Run Code Online (Sandbox Code Playgroud)

诀窍是基于ForkJoinTask.fork,它指定:"安排在当前任务运行的池中异步执行此任务(如果适用),或者如果不是inForkJoinPool()则使用ForkJoinPool.commonPool()"

  • 有关解决方案的详细信息,请参见http://blog.krecan.net/2014/03/18/how-to-specify-thread-pool-for-java-8-parallel-streams/ (19认同)
  • @Lukas感谢您的片段.我将补充一点,`ForkJoinPool`实例应该是`shutdown()`,因为不再需要它来避免线程泄漏.[(实施例)](https://github.com/jacek-rzrz/ForkJoinPool-thread-leak) (6认同)
  • @Lukas不,它不起作用.请访问http://stackoverflow.com/q/36947336/3645944 (6认同)
  • 请注意,Java 8 中存在一个错误,即使任务在自定义池实例上运行,它们仍然耦合到共享池:计算的大小仍然与公共池而不是自定义池成比例。在 Java 10 中修复:[JDK-8190974](https://bugs.openjdk.java.net/browse/JDK-8190974) (5认同)
  • 但它是否也指定了流使用`ForkJoinPool`或者是实现细节?链接到文档会很好. (3认同)
  • @terran 此问题也已针对 Java 8 https://bugs.openjdk.java.net/browse/JDK-8224620 修复 (3认同)
  • 仅供参考 - JDK 团队根本不喜欢这个解决方案 - 请参阅:https://mail.openjdk.java.net/pipermail/jdk-dev/2019-October/003438.html https://mail.openjdk。 java.net/pipermail/jdk-dev/2019-October/003440.html https://mail.openjdk.java.net/pipermail/jdk-dev/2019-October/003443.html (2认同)

ass*_*ias 180

并行流使用默认值ForkJoinPool.commonPool,默认情况下,当您拥有处理器时,默认情况下只有一个线程(由此返回)Runtime.getRuntime().availableProcessors()(这意味着并行流使用所有处理器,因为它们也使用主线程):

对于需要单独或自定义池的应用程序,可以使用给定的目标并行度级别构造ForkJoinPool; 默认情况下,等于可用处理器的数量.

这也意味着如果您同时启动嵌套并行流或多个并行流,它们将共享同一个池.优点:您永远不会使用超过默认值(可用处理器数量).缺点:您可能无法为您启动的每个并行流分配"所有处理器"(如果您碰巧有多个).(显然你可以使用ManagedBlocker来规避它.)

要更改并行流的执行方式,您也可以

  • 将并行流执行提交给您自己的ForkJoinPool:yourFJP.submit(() -> stream.parallel().forEach(soSomething)).get();
  • 您可以使用系统属性更改公共池的大小:System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20")对于20个线程的目标并行度.

我的机器上有后者的例子有8个处理器.如果我运行以下程序:

long start = System.currentTimeMillis();
IntStream s = IntStream.range(0, 20);
//System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
s.parallel().forEach(i -> {
    try { Thread.sleep(100); } catch (Exception ignore) {}
    System.out.print((System.currentTimeMillis() - start) + " ");
});
Run Code Online (Sandbox Code Playgroud)

输出是:

215 216 216 216 216 216 216 216 315 316 316 316 316 316 316 316 415 416 416 416

因此,您可以看到并行流一次处理8个项目,即它使用8个线程.但是,如果我取消注释注释行,则输出为:

215 215 215 215 215 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216

这次,并行流使用了20个线程,并且流中的所有20个元素已同时处理.

  • `commonPool`实际上比`availableProcessors`少一个,导致总并行度等于`availableProcessors`,因为调用线程计为1. (25认同)
  • 我不相信`ForkJoinPool.submit(()-&gt; stream.forEach(...))`将使用给定的`ForkJoinPool`运行我的Stream动作。我希望整个Stream-Action在ForJoinPool中作为一个动作执行,但是在内部仍在使用默认/通用的ForkJoinPool。您在哪里看到ForkJoinPool.submit()会按照您说的去做? (4认同)
  • 提交返回`ForkJoinTask`.要模仿`parallel()``get()`是需要的:`stream.parallel().forEach(soSomething)).get();` (2认同)
  • 我现在看到/sf/answers/2445158201/很好地说明了它实际上如声明的那样工作。但是我还是不明白它是如何工作的。但我对“有效”感到满意。谢谢! (2认同)
  • 我建议恢复 Tod Casasent 的编辑,因为 JDK-8190974 中没有任何内容表明 `System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", …)` 将不再起作用,并且从 JDK 18 开始,它仍然有效按预期工作。 (2认同)

Mar*_*sco 36

除了在您自己的forkJoinPool中触发并行计算的技巧之外,您还可以将该池传递给CompletableFuture.supplyAsync方法,如:

ForkJoinPool forkJoinPool = new ForkJoinPool(2);
CompletableFuture<List<Integer>> primes = CompletableFuture.supplyAsync(() ->
    //parallel task here, for example
    range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime).collect(toList()), 
    forkJoinPool
);
Run Code Online (Sandbox Code Playgroud)


Tod*_*ent 21

使用ForkJoinPool并提交并行流不能可靠地使用所有线程.如果你看一下(来自HashSet的并行流不并行运行)和这个(为什么并行流不使用ForkJoinPool的所有线程?),你会看到推理.

简短版本:如果ForkJoinPool/submit不适合您,请使用

ForkJoinPool fjpool = new ForkJoinPool(10);
System.out.println("stream.parallel");
IntStream range = IntStream.range(0, 20);
fjpool.submit(() -> range.parallel()
        .forEach((int theInt) ->
        {
            try { Thread.sleep(100); } catch (Exception ignore) {}
            System.out.println(Thread.currentThread().getName() + " -- " + theInt);
        })).get();
System.out.println("list.parallelStream");
int [] array = IntStream.range(0, 20).toArray();
List<Integer> list = new ArrayList<>();
for (int theInt: array)
{
    list.add(theInt);
}
fjpool.submit(() -> list.parallelStream()
        .forEach((theInt) ->
        {
            try { Thread.sleep(100); } catch (Exception ignore) {}
            System.out.println(Thread.currentThread().getName() + " -- " + theInt);
        })).get();
Run Code Online (Sandbox Code Playgroud)


Kay*_*ayV 10

我们可以使用以下属性更改默认并行度:

-Djava.util.concurrent.ForkJoinPool.common.parallelism=16
Run Code Online (Sandbox Code Playgroud)

可以设置使用更多并行性。


cha*_*lie 8

要测量实际使用的线程数,您可以检查Thread.activeCount():

    Runnable r = () -> IntStream
            .range(-42, +42)
            .parallel()
            .map(i -> Thread.activeCount())
            .max()
            .ifPresent(System.out::println);

    ForkJoinPool.commonPool().submit(r).join();
    new ForkJoinPool(42).submit(r).join();
Run Code Online (Sandbox Code Playgroud)

这可以在4核CPU上产生如下输出:

5 // common pool
23 // custom pool
Run Code Online (Sandbox Code Playgroud)

没有.parallel()它给出:

3 // common pool
4 // custom pool
Run Code Online (Sandbox Code Playgroud)

  • Thread.activeCount()不会告诉您哪些线程正在处理您的流.映射到Thread.currentThread().getName(),然后是distinct().然后您将意识到不会使用池中的每个线程...为您的处理添加延迟,并且将利用池中的所有线程. (6认同)

Ste*_*stl 7

到目前为止,我使用了这个问题的答案中描述的解决方案.现在,我想出了一个名为Parallel Stream Support的小库:

ForkJoinPool pool = new ForkJoinPool(NR_OF_THREADS);
ParallelIntStreamSupport.range(1, 1_000_000, pool)
    .filter(PrimesPrint::isPrime)
    .collect(toList())
Run Code Online (Sandbox Code Playgroud)

但正如@PabloMatiasGomez在评论中指出的那样,并行流的分裂机制存在缺陷,这在很大程度上取决于公共池的大小.请参阅来自HashSet的并行流不并行运行.

我使用此解决方案只是为不同类型的工作提供单独的池,但即使我不使用它,我也无法将公共池的大小设置为1.


小智 6

注意: JDK 10 中似乎实现了一个修复,可确保自定义线程池使用预期的线程数。

自定义 ForkJoinPool 中的并行流执行应遵循并行性 https://bugs.openjdk.java.net/browse/JDK-8190974


Grz*_*rek 5

如果您不想依赖实现技巧,总有一种方法可以通过实现结合语义的自定义收集器来实现相同的目的map...collect并且您不会局限于 ForkJoinPool:

list.stream()
  .collect(parallel(i -> process(i), executor, 4))
  .join()
Run Code Online (Sandbox Code Playgroud)

幸运的是,它已经在这里完成并可在 Maven Central 上使用: http: //github.com/pivovarit/parallel-collectors

免责声明:我写了它并承担责任。