使用新的fork/join框架而不仅仅是在开始时将大任务分成N个子任务,将它们发送到缓存的线程池(来自Executors)并等待每个任务完成,有什么好处?我没有看到使用fork/join抽象如何简化问题或使解决方案从我们多年来的工作中提高效率.
例如,教程示例中的并行化模糊算法可以像这样实现:
public class Blur implements Runnable {
private int[] mSource;
private int mStart;
private int mLength;
private int[] mDestination;
private int mBlurWidth = 15; // Processing window size, should be odd.
public ForkBlur(int[] src, int start, int length, int[] dst) {
mSource = src;
mStart = start;
mLength = length;
mDestination = dst;
}
public void run() {
computeDirectly();
}
protected void computeDirectly() {
// As in the example, omitted for brevity
} …
Run Code Online (Sandbox Code Playgroud) 我遇到了一个奇怪的情况,在静态初始化程序中使用带有lambda的并行流看似永远没有CPU利用率.这是代码:
class Deadlock {
static {
IntStream.range(0, 10000).parallel().map(i -> i).count();
System.out.println("done");
}
public static void main(final String[] args) {}
}
Run Code Online (Sandbox Code Playgroud)
这似乎是此行为的最小再现测试用例.如果我:
代码立即完成.谁能解释这种行为?这是一个错误还是这个意图?
我正在使用OpenJDK版本1.8.0_66-internal.
node.js的事件驱动编程模型使得协调程序流有点棘手.
简单的顺序执行变成了嵌套的回调,这很容易(虽然写下来有点复杂).
但并行执行怎么样?假设您有三个可以并行运行的任务A,B,C,当它们完成时,您希望将结果发送到任务D.
使用fork/join模型,这将是
我如何在node.js中编写它?有没有最佳做法或烹饪书?我是否每次都必须手动滚动解决方案,或者是否有一些带帮助程序的库?
javascript parallel-processing concurrency fork-join node.js
我刚读完这篇文章:Java-5 ThreadPoolExecutor相对于Java-7 ForkJoinPool有什么优势?觉得答案不够直接.
您能用简单的语言和示例来解释,Java 7的Fork-Join框架与旧解决方案之间的权衡取舍是什么?
我还阅读了Google关于Java提示的#1热门提示:何时从javaworld.com 使用ForkJoinPool vs ExecutorService但文章没有回答标题问题时,它主要讨论api差异......
Fork/Join和Map/Reduce之间的主要区别是什么?
它们的分解和分布类型(数据与计算)有何不同?
据我所知ForkJoinPool
,该池创建了固定数量的线程(默认值:核心数),并且永远不会创建更多线程(除非应用程序通过使用表明需要这些线程managedBlock
).
但是,使用ForkJoinPool.getPoolSize()
我发现在创建30,000个任务(RecursiveAction
)的程序中,ForkJoinPool
执行这些任务平均使用700个线程(每次创建任务时计算的线程数).任务不做I/O,而是纯粹的计算; 唯一的任务间同步是调用ForkJoinTask.join()
和访问AtomicBoolean
s,即没有线程阻塞操作.
因为join()
不会像我理解的那样阻塞调用线程,所以没有理由为什么池中的任何线程都应该阻塞,所以(我曾经假设)应该没有理由创建任何进一步的线程(这显然发生了) .
那么,为什么要ForkJoinPool
创建这么多线程呢?哪些因素决定了创建的线程数?
我曾希望这个问题可以在不发布代码的情况下得到解答,但在此请求.此代码摘自四倍大小的程序,简化为必要部分; 它不会按原样编译.如果需要,我当然也可以发布完整的程序.
程序使用深度优先搜索在迷宫中搜索从给定起点到给定终点的路径.保证存在解决方案.主要逻辑在以下compute()
方法中SolverTask
:A RecursiveAction
从某个给定点开始,并继续从当前点可到达的所有邻居点.它不是SolverTask
在每个分支点创建一个新的(这将创建太多的任务),而是将除了一个之外的所有邻居推送到后退堆栈以便稍后处理,并继续只有一个邻居没有被推送到堆栈.一旦它以这种方式达到死胡同,就会弹出最近推到回溯堆栈的点,并从那里继续搜索(相应地减少从taks起点构建的路径).一旦任务发现其回溯堆栈大于某个阈值,就会创建一个新任务; 从那时起,任务在继续从其回溯堆栈中弹出直到耗尽时,在到达分支点时不会将任何其他点推到其堆栈,而是为每个这样的点创建一个新任务.因此,可以使用堆栈限制阈值来调整任务的大小.
我上面引用的数字("30,000个任务,平均700个线程")来自于搜索5000x5000个单元格的迷宫.所以,这是基本代码:
class SolverTask extends RecursiveTask<ArrayDeque<Point>> {
// Once the backtrack stack has reached this size, the current task
// will never add another cell to it, but create a new task for each
// newly discovered branch:
private static final int MAX_BACKTRACK_CELLS = 100*1000; …
Run Code Online (Sandbox Code Playgroud) 在Observables forkJoin文档中,它说args可以是一个数组,但它没有列出这样做的例子:
https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/forkjoin.md
我尝试过类似于我列出的功能(如下所示)但是想出了一个错误:
:3000/angular2/src/platform/browser/browser_adapter.js:76
EXCEPTION: TypeError: Observable_1.Observable.forkJoin is not a function
Run Code Online (Sandbox Code Playgroud)
我的功能的剪切版本如下:
processStuff( inputObject ) {
let _self = this;
return new Observable(function(observer) {
let observableBatch = [];
inputObject.forEach(function(componentarray, key) {
observableBatch.push(_self.http.get(key + '.json').map((res: Response) => res.json()));
});
Observable.forkJoin(
observableBatch
// );
).subscribe(() => {
observer.next();
observer.complete();
});
});
}
Run Code Online (Sandbox Code Playgroud)
我的问题的根与循环结束有关,然后按此处的要求继续:Angular2 Observable - 如何在循环之前等待循环中的所有函数调用结束?
但我还没有完全掌握forkJoin与数组的正确用法以及正确的语法.
我非常感谢您提供的帮助.
thirdFunction() {
let _self = this;
return Observable.create((observer) => {
// return new Observable(function(observer) {
...
observer.next(responseargs);
observer.complete();
});
}
processStuff(inputObject) …
Run Code Online (Sandbox Code Playgroud) 在我的应用程序中,我必须通过执行许多network-io绑定任务来解决问题,有时一个io绑定任务并分成更小的io绑定任务.这些任务目前正在使用Java的标准线程池机制执行.我想知道我是否可以转向fork-and-join框架?但问题是,forkandjoin框架通常用于解决io绑定操作或CPU绑定吗?我假设它们主要用于CPU绑定操作,因为fork-and-join框架利用工作窃取技术来利用多核处理器,但如果我将它用于IO绑定任务,会不会有任何不利影响?
我想Runnable
通过一种方法将任务提交到ForkJoinPool:
forkJoinPool.submit(Runnable task)
Run Code Online (Sandbox Code Playgroud)
注意,我使用的是JDK 7.
在引擎盖下,它们被转换为ForkJoinTask对象.我知道当一个任务以递归方式分成较小的任务时,ForkJoinPool是有效的.
题:
如果没有递归,窃取工作仍然可以在ForkJoinPool中工作吗?
在这种情况下值得吗?
更新1: 任务很小,可能不平衡.即使对于严格相同的任务,诸如上下文切换,线程调度,停车,页面未命中等事情也会妨碍导致不平衡.
更新2: Doug Lea在Concurrency JSR-166兴趣小组中写道,给出了一个暗示:
当所有任务都是异步并提交到池而不是分叉时,这也极大地提高了吞吐量,这成为构造actor框架的合理方法,以及许多您可能使用ThreadPoolExecutor的普通服务.
我认为,当涉及到相当小的CPU绑定任务时,由于这种优化,ForkJoinPool是可行的方法.重点是这些任务已经很小,不需要递归分解.工作窃取工作,无论是大型还是小型任务 - 任务都可以被来自忙碌工人的Deque尾巴的另一个自由工作者抓住.
更新3: ForkJoinPool的可扩展性 - Akka乒乓球队的基准测试显示了很好的结果.
尽管如此,要更有效地应用ForkJoinPool需要进行性能调整.
以下是我对Java 8 的Stream框架的理解:
虽然有人已经找到了一种方法来使用自定义线程池和Stream框架的并行执行,但我不能在Java 8 API中找到任何提及默认Java 8并行Stream实现将使用ForkJoinPool#commonPool()的内容.(Collection#parallelStream(),StreamSupport类中的方法,以及API中我不知道的其他可能的并行启用流源).
我只能搜索搜索结果的花絮是:
Lambda的状态:Libraries Edition("引擎盖下的并行")
Vaguely提到Stream框架和Fork/Join机制.
Fork/Join机器旨在实现此过程的自动化.
JEP 107:集合的批量数据操作
几乎直接表明Collection接口的默认方法#parallelStream()使用Fork/Join实现自身.但仍然没有关于公共池.
并行实现基于Java 7中引入的java.util.concurrency Fork/Join实现.
类数组(Javadoc)
直接表示使用公共池的多次.
ForkJoin公共池用于执行任何并行任务.
所以我的问题是:
在哪里说ForkJoinPool#commonPool()用于对从Java 8 API获得的流进行并行操作?
fork-join ×10
java ×7
concurrency ×2
forkjoinpool ×2
java-8 ×2
java-stream ×2
angular ×1
deadlock ×1
javascript ×1
mapreduce ×1
node.js ×1
observable ×1
threadpool ×1