以下是我对Java 8 的Stream框架的理解:
虽然有人已经找到了一种方法来使用自定义线程池和Stream框架的并行执行,但我不能在Java 8 API中找到任何提及默认Java 8并行Stream实现将使用ForkJoinPool#commonPool()的内容.(Collection#parallelStream(),StreamSupport类中的方法,以及API中我不知道的其他可能的并行启用流源).
我只能搜索搜索结果的花絮是:
Lambda的状态:Libraries Edition("引擎盖下的并行")
Vaguely提到Stream框架和Fork/Join机制.
Fork/Join机器旨在实现此过程的自动化.
JEP 107:集合的批量数据操作
几乎直接表明Collection接口的默认方法#parallelStream()使用Fork/Join实现自身.但仍然没有关于公共池.
并行实现基于Java 7中引入的java.util.concurrency Fork/Join实现.
类数组(Javadoc)
直接表示使用公共池的多次.
ForkJoin公共池用于执行任何并行任务.
所以我的问题是:
在哪里说ForkJoinPool#commonPool()用于对从Java 8 API获得的流进行并行操作?
我读到了Java 7中引入的Fork/Join框架的实现,我只是想检查一下我是否理解了魔法是如何工作的.
据我所知,当一个线程分叉时,它会在其队列中创建子任务(其他线程可能会或可能不会被窃取).当线程尝试"加入"时,它实际上检查其队列中的现有任务,然后递归执行它们,这意味着对于任何"连接"操作 - 将在线程调用堆栈中添加2个帧(一个用于连接,一个用于连接)对于新的任务调用).
据我所知,JVM不支持尾调用优化(在这种情况下可以用于删除连接方法堆栈帧)我相信在执行带有大量分支和连接的复杂操作时,线程可能会抛出StackOverflowError
.
我是对的还是他们找到了一些防止它的好方法?
编辑
这是一个帮助澄清问题的场景:说(为简单起见)我们在forkjoin池中只有一个线程.在某个时间点 - 线程分叉然后调用join.在join方法中,线程发现它可以执行分叉任务(因为它在队列中找到),因此它调用下一个任务.此任务依次分叉然后调用join - 因此在执行join方法时,线程将在其队列中找到分叉任务(如前所述)并调用它.在该阶段,调用堆栈将至少包含两个连接和两个任务的帧.
正如您所看到的,fork join框架转换为普通递归.因为java不支持尾调用优化 - java中的每次递归都会导致StackOverflowError
它变得足够深.
我的问题是 - fork/join框架的实现者是否找到了防止这种情况的一些很酷的方法.
从理论上讲,使用Scala Actor Framework可以像JDK 7的Fork-Join框架一样进行一种异步的分治计算吗?如果是这样,我怎么能用框架表达FJ问题 - 例如,教程mergesort概念?我们欢迎代码小贴士.
一个线程死锁饥饿如果池中的所有线程都在等待同一个池中排队的任务完成发生在一个正常的线程池. ForkJoinPool
通过从join()
调用内部窃取其他线程的工作来避免这个问题,而不是简单地等待.例如:
private static class ForkableTask extends RecursiveTask<Integer> {
private final CyclicBarrier barrier;
ForkableTask(CyclicBarrier barrier) {
this.barrier = barrier;
}
@Override
protected Integer compute() {
try {
barrier.await();
return 1;
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
}
}
@Test
public void testForkJoinPool() throws Exception {
final int parallelism = 4;
final ForkJoinPool pool = new ForkJoinPool(parallelism);
final CyclicBarrier barrier = new CyclicBarrier(parallelism);
final List<ForkableTask> forkableTasks = new ArrayList<>(parallelism);
for (int …
Run Code Online (Sandbox Code Playgroud) java concurrency multithreading java.util.concurrent fork-join
使用中的低级差异是什么:
ForkJoinPool = new ForkJoinPool(X);
Run Code Online (Sandbox Code Playgroud)
和
ExecutorService ex = Executors.neWorkStealingPool(X);
Run Code Online (Sandbox Code Playgroud)
其中X是所需的并行级别,即线程运行..
根据文档我发现它们相似.还告诉我哪一个在任何正常用途下更合适和安全.我有1.3亿个条目写入BufferedWriter并使用Unix排序按第1列排序.
另请告诉我如果可能的话要保留多少个线程.
注意:我的系统有 8个核心处理器和 32 GB RAM.
multithreading executorservice fork-join executors forkjoinpool
这对于今天另一个问题的回答是一个"副作用" .这更多是关于好奇心而不是实际问题.
Java SE 7提供了Oracle称之为"fork/join框架"的东西.这是将工作安排到多个处理器的一种可能的优秀方式.虽然我理解它应该如何工作,但我无法理解它优越的地方和关于偷工作的说法.
也许其他人更深入地了解为什么这种方法是可取的(除了因为它有一个奇特的名字).
/加盟叉的根本原语ForkJoinTask
s,这是Future
S,而这个想法是要么执行工作立即[原文](措辞是误导,因为"立即"意味着它同步发生在主线程,在现实中发生这种情况的内部a Future
)低于某个阈值或递归地将工作划分为两个任务,直到达到阈值.
未来是一种封装任务的概念,该任务以不透明和未指定的方式异步运行到对象中.您有一个函数可以验证结果是否可用,并且您获得了一个允许您(等待和)检索结果的函数.
严格地说,你甚至不知道未来是否异步运行,它可以在内部执行get()
.从理论上讲,实现可以为每个未来生成一个线程或使用线程池.
实际上,Java将future作为任务队列上的任务实现,并附加了一个线程池(对于整个fork/join框架也是如此).
fork/join文档给出了这个具体的用法示例:
protected void compute() {
if (mLength < sThreshold) {
computeDirectly();
return;
}
int split = mLength / 2;
invokeAll(new ForkBlur(mSource, mStart, split, mDestination),
new ForkBlur(mSource, mStart + split, mLength - split,
mDestination));
}
Run Code Online (Sandbox Code Playgroud)
这将以与Mergesort将如何遍历它们的方式相同的方式将任务提交给底层线程池的任务队列(由于递归).
比如说我们有一个32个"项目"的数组要处理并且阈值为4,并且均匀分割,它将产生8个任务,每个具有4个"项目",看起来像这样:
00 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 …
Run Code Online (Sandbox Code Playgroud) java concurrency multithreading java.util.concurrent fork-join
当我遇到这个构造函数时,我正在研究关于Fork/Join框架的oracle文档ForkJoinPool
:ForkJoinPool(int parallelism)
.文档说这是并行级别,默认情况下等于可用处理器的数量.谁能告诉我如何使用它来提高程序的速度和效率?
我正在使用Rxjs
中的angualr-cli
应用程序.
在viewer.component.ts中
//Other Imports
import { Observable } from 'rxjs/Observable';
//omitting for brevity
export class ViewerComponent implements OnInit, AfterViewInit, OnDestroy {
someFunction(someArg){
//omitting for brevity
let someArray: any = [];
//Add some info
Observable.forkJoin(someArray).subscribe(data => {
//Do something with data
});
}
//omitting for brevity
}
Run Code Online (Sandbox Code Playgroud)
我得到了错误
ERROR TypeError: __WEBPACK_IMPORTED_MODULE_2_rxjs_Observable__.Observable.forkJoin is not a function
at ViewerComponent.webpackJsonp../src/app/component/viewer.component.ts.ViewerComponent.someFunction(http://localhost:4200/main.bundle.js:4022:73)
at http://localhost:4200/main.bundle.js:3951:31
Run Code Online (Sandbox Code Playgroud)
但如果我Rxjs
完全导入(import 'rxjs';
)一切正常.没错.我似乎还明白了什么是必要的.我也尝试导入rxjs/Observable/forkjoin
但没有任何可用的.
关于如何解决这个问题的任何指示.
我有一个长期运行的计算,我需要执行一长串输入.计算是独立的,所以我想将它们分配给几个CPU.我使用的是Java 8.
代码的框架如下所示:
ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
MyService myService = new MyService(executorService);
List<MyResult> results =
myInputList.stream()
.map(myService::getResultFuture)
.map(CompletableFuture::join)
.collect(Collectors.toList());
executorService.shutdown();
Run Code Online (Sandbox Code Playgroud)
负责计算的主要功能如下:
CompletableFuture<MyResult> getResultFuture(MyInput input) {
return CompletableFuture.supplyAsync(() -> longCalc(input), executor)))
}
Run Code Online (Sandbox Code Playgroud)
长时间运行的计算是无状态的,并且不执行任何IO.
我希望这段代码能够使用所有可用的CPU,但它不会发生.例如,在具有72个CPU和numThreads=72
(或甚至例如numThreads=500
)的机器上,CPU使用率最多为500-1000%,如htop所示:
根据线程转储,许多计算线程都在等待,即:
"pool-1-thread-34" #55 prio=5 os_prio=0 tid=0x00007fe858597890 nid=0xd66 waiting on condition [0x00007fe7f9cdd000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x0000000381815f20> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Locked ownable synchronizers:
- …
Run Code Online (Sandbox Code Playgroud) 我知道设置并行处理会有开销,Stream
如果项目很少或者每个项目的处理速度很快,单个线程中的处理速度会更快.
但是,是否存在类似的阈值trySplit()
,将问题分解为较小的块会产生相反的效果?我在想类似于合并排序切换到最小块的插入排序.
如果是这样,阈值是否取决于过程中物品的相对成本trySplit()
和消耗tryAdvance()
?例如,考虑一个分裂操作,它比推进数组索引要复杂得多 - 分割一个词法排序的多集排列.是否存在允许客户在创建并行流时指定拆分下限的约定,具体取决于其使用者的复杂程度?启发式Spliterator
可以用来估计下限本身吗?
或者,或者,让a的下限为Spliterator
1 是否总是安全的,让工作窃取算法负责选择是否继续拆分?
fork-join ×10
java ×7
concurrency ×5
forkjoinpool ×2
java-8 ×2
actor ×1
angular ×1
angular-cli ×1
executors ×1
java-7 ×1
java-stream ×1
rxjs5 ×1
scala ×1
spliterator ×1