如何(全局)替换Java并行流的公共线程池后端?

Chr*_*ies 7 java parallel-processing concurrency multithreading stream

我想全局替换Java并行流默认使用的公共线程池,例如for

IntStream.range(0,100).parallel().forEach(i -> {
    doWork();
});
Run Code Online (Sandbox Code Playgroud)

我知道可以通过向专用线程池提交此类指令来使用专用的ForkJoinPool(请参阅Java 8并行流中的自定义线程池).这里的问题是

  • 是否有可能通过其他实现替换常见的ForkJoinPool(比如说Executors.newFixedThreadPool(10)
  • 是否可以通过某些全局设置来实现,例如,某些JVM属性?

备注:我之所以要更换F/J池,是因为它似乎有一个错误,使其无法用于嵌套并行循环.

嵌套并行循环的性能很差,可能导致死锁,请参阅http://christian-fries.de/blog/files/2014-nested-java-8-parallel-foreach.html

例如:以下代码导致死锁:

// Outer loop
IntStream.range(0,24).parallel().forEach(i -> {

    // (omitted:) do some heavy work here (consuming majority of time)

    // Need to synchronize for a small "subtask" (e.g. updating a result)
    synchronized(this) {
        // Inner loop (does s.th. completely free of side-effects, i.e. expected to work)
        IntStream.range(0,100).parallel().forEach(j -> {
            // do work here
        });
    }
});
Run Code Online (Sandbox Code Playgroud)

(即使在"在这里工作"没有任何额外的代码,因为并行性设置为<12).

我的问题是如何更换FJP.如果您想讨论嵌套并行循环,您可能会检查嵌套Java 8并行forEach循环执行不佳.这种行为有望吗?.

isn*_*bad 6

我认为这不是流API的使用方式.看起来你(错误地)使用它来执行并行任务执行(专注于任务,而不是数据),而不是进行并行流处理(专注于流中的数据).您的代码以某种方式违反了流的一些主要原则.(我正在以某种方式写'因为它不是真的被禁止但是气馁):避免状态和副作用.

除此之外(或者可能是因为副作用),你在外循环中使用了大量的同步,这是其他一切,但无害!

虽然文档中没有提到,但并行流在内部使用公共ForkJoinPool流.无论这是否缺乏文件,我们都必须接受这一事实.在的JavaDoc的ForkJoinTask状态:

可以定义和使用可能阻塞的ForkJoinTasks,但这样做需要进一步考虑:(1)如果任何其他任务应该依赖于阻止外部同步或I/O的任务,则完成很少.从未加入的事件类型异步任务(例如,那些子类化CountedCompleter)通常属于此类别.(2)为了尽量减少资源影响,任务应该很小; 理想情况下只执行(可能)阻止操作.(3)除非使用ForkJoinPool.ManagedBlocker API,或者已知可能阻塞的任务数小于池的ForkJoinPool.getParallelism级别,否则池无法保证有足够的线程可用于确保进度或良好性能.

同样,您似乎使用流来替代简单的for循环和执行程序服务.

  • 如果您只想n并行执行任务,请使用ExecutionService
  • 如果您有一个更复杂的示例,其中任务正在创建子任务,请考虑使用ForkJoinPool(with ForkJoinTasks)代替.(它确保了一定数量的线程而没有死锁的危险,因为等待其他任务完成的任务太多,因为等待任务不会阻塞其执行线程).
  • 如果要(并行)处理数据,请考虑使用流API.
  • 您无法"安装"自定义公共池.它是在私有静态代码内部创建的.
  • 但是你可以使用某些系统属性来影响并行性,线程工厂和公共池的异常处理程序(参见ForkJoinPool的JavaDoc)

不要混淆ExecutionServiceForkJoinPool.它们(通常)不是彼此的替代品!