Chr*_*ies 7 java parallel-processing concurrency multithreading stream
我想全局替换Java并行流默认使用的公共线程池,例如for
IntStream.range(0,100).parallel().forEach(i -> {
doWork();
});
Run Code Online (Sandbox Code Playgroud)
我知道可以通过向专用线程池提交此类指令来使用专用的ForkJoinPool(请参阅Java 8并行流中的自定义线程池).这里的问题是
Executors.newFixedThreadPool(10)
?备注:我之所以要更换F/J池,是因为它似乎有一个错误,使其无法用于嵌套并行循环.
嵌套并行循环的性能很差,可能导致死锁,请参阅http://christian-fries.de/blog/files/2014-nested-java-8-parallel-foreach.html
例如:以下代码导致死锁:
// Outer loop
IntStream.range(0,24).parallel().forEach(i -> {
// (omitted:) do some heavy work here (consuming majority of time)
// Need to synchronize for a small "subtask" (e.g. updating a result)
synchronized(this) {
// Inner loop (does s.th. completely free of side-effects, i.e. expected to work)
IntStream.range(0,100).parallel().forEach(j -> {
// do work here
});
}
});
Run Code Online (Sandbox Code Playgroud)
(即使在"在这里工作"没有任何额外的代码,因为并行性设置为<12).
我的问题是如何更换FJP.如果您想讨论嵌套并行循环,您可能会检查嵌套Java 8并行forEach循环执行不佳.这种行为有望吗?.
我认为这不是流API的使用方式.看起来你(错误地)使用它来执行并行任务执行(专注于任务,而不是数据),而不是进行并行流处理(专注于流中的数据).您的代码以某种方式违反了流的一些主要原则.(我正在以某种方式写'因为它不是真的被禁止但是气馁):避免状态和副作用.
除此之外(或者可能是因为副作用),你在外循环中使用了大量的同步,这是其他一切,但无害!
虽然文档中没有提到,但并行流在内部使用公共ForkJoinPool
流.无论这是否缺乏文件,我们都必须接受这一事实.在的JavaDoc的ForkJoinTask
状态:
可以定义和使用可能阻塞的ForkJoinTasks,但这样做需要进一步考虑:(1)如果任何其他任务应该依赖于阻止外部同步或I/O的任务,则完成很少.从未加入的事件类型异步任务(例如,那些子类化CountedCompleter)通常属于此类别.(2)为了尽量减少资源影响,任务应该很小; 理想情况下只执行(可能)阻止操作.(3)除非使用ForkJoinPool.ManagedBlocker API,或者已知可能阻塞的任务数小于池的ForkJoinPool.getParallelism级别,否则池无法保证有足够的线程可用于确保进度或良好性能.
同样,您似乎使用流来替代简单的for循环和执行程序服务.
n
并行执行任务,请使用ExecutionService
ForkJoinPool
(with ForkJoinTasks
)代替.(它确保了一定数量的线程而没有死锁的危险,因为等待其他任务完成的任务太多,因为等待任务不会阻塞其执行线程).不要混淆ExecutionService
和ForkJoinPool
.它们(通常)不是彼此的替代品!