标签: fork-join

fork/join框架如何比线程池更好?

使用新的fork/join框架而不仅仅是在开始时将大任务分成N个子任务,将它们发送到缓存的线程池(来自Executors)并等待每个任务完成,有什么好处?我没有看到使用fork/join抽象如何简化问题或使解决方案从我们多年来的工作中提高效率.

例如,教程示例中的并行化模糊算法可以像这样实现:

public class Blur implements Runnable {
    private int[] mSource;
    private int mStart;
    private int mLength;
    private int[] mDestination;

    private int mBlurWidth = 15; // Processing window size, should be odd.

    public ForkBlur(int[] src, int start, int length, int[] dst) {
        mSource = src;
        mStart = start;
        mLength = length;
        mDestination = dst;
    }

    public void run() {
        computeDirectly();
    }

    protected void computeDirectly() {
        // As in the example, omitted for brevity
    } …
Run Code Online (Sandbox Code Playgroud)

java fork-join

128
推荐指数
8
解决办法
5万
查看次数

为什么在静态初始化程序中使用lambda的并行流导致死锁?

我遇到了一个奇怪的情况,在静态初始化程序中使用带有lambda的并行流看似永远没有CPU利用率.这是代码:

class Deadlock {
    static {
        IntStream.range(0, 10000).parallel().map(i -> i).count();
        System.out.println("done");
    }
    public static void main(final String[] args) {}
}
Run Code Online (Sandbox Code Playgroud)

这似乎是此行为的最小再现测试用例.如果我:

  • 将块放在main方法而不是静态初始化器中,
  • 删除并行化,或
  • 删除lambda,

代码立即完成.谁能解释这种行为?这是一个错误还是这个意图?

我正在使用OpenJDK版本1.8.0_66-internal.

java deadlock fork-join java-8 java-stream

82
推荐指数
3
解决办法
4222
查看次数

协调node.js中的并行执行

node.js的事件驱动编程模型使得协调程序流有点棘手.

简单的顺序执行变成了嵌套的回调,这很容易(虽然写下来有点复杂).

但并行执行怎么样?假设您有三个可以并行运行的任务A,B,C,当它们完成时,您希望将结果发送到任务D.

使用fork/join模型,这将是

  • 叉子A.
  • 叉子B.
  • 叉子C.
  • 加入A,B,C,运行D.

我如何在node.js中编写它?有没有最佳做法或烹饪书?我是否每次都必须手动滚动解决方案,或者是否有一些带帮助程序的库?

javascript parallel-processing concurrency fork-join node.js

79
推荐指数
3
解决办法
4万
查看次数

Java的Fork/Join vs ExecutorService - 何时使用哪个?

我刚读完这篇文章:Java-5 ThreadPoolExecutor相对于Java-7 ForkJoinPool有什么优势?觉得答案不够直接.

您能用简单的语言和示例来解释,Java 7的Fork-Join框架与旧解决方案之间的权衡取舍是什么?

我还阅读了Google关于Java提示的#1热门提示:何时javaworld.com 使用ForkJoinPool vs ExecutorService但文章没有回答标题问题,它主要讨论api差异......

java concurrency multithreading executorservice fork-join

52
推荐指数
6
解决办法
3万
查看次数

Fork/Join和Map/Reduce之间的区别

Fork/Join和Map/Reduce之间的主要区别是什么?

它们的分解和分布类型(数据与计算)有何不同?

mapreduce fork-join

42
推荐指数
2
解决办法
8455
查看次数

是什么决定了Java ForkJoinPool创建的线程数?

据我所知ForkJoinPool,该池创建了固定数量的线程(默认值:核心数),并且永远不会创建更多线程(除非应用程序通过使用表明需要这些线程managedBlock).

但是,使用ForkJoinPool.getPoolSize()我发现在创建30,000个任务(RecursiveAction)的程序中,ForkJoinPool执行这些任务平均使用700个线程(每次创建任务时计算的线程数).任务不做I/O,而是纯粹的计算; 唯一的任务间同步是调用ForkJoinTask.join()和访问AtomicBooleans,即没有线程阻塞操作.

因为join()不会像我理解的那样阻塞调用线程,所以没有理由为什么池中的任何线程都应该阻塞,所以(我曾经假设)应该没有理由创建任何进一步的线程(这显然发生了) .

那么,为什么要ForkJoinPool创建这么多线程呢?哪些因素决定了创建的线程数?

我曾希望这个问题可以在不发布代码的情况下得到解答,但在此请求.此代码摘自四倍大小的程序,简化为必要部分; 它不会按原样编译.如果需要,我当然也可以发布完整的程序.

程序使用深度优先搜索在迷宫中搜索从给定起点到给定终点的路径.保证存在解决方案.主要逻辑在以下compute()方法中SolverTask:A RecursiveAction从某个给定点开始,并继续从当前点可到达的所有邻居点.它不是SolverTask在每个分支点创建一个新的(这将创建太多的任务),而是将除了一个之外的所有邻居推送到后退堆栈以便稍后处理,并继续只有一个邻居没有被推送到堆栈.一旦它以这种方式达到死胡同,就会弹出最近推到回溯堆栈的点,并从那里继续搜索(相应地减少从taks起点构建的路径).一旦任务发现其回溯堆栈大于某个阈值,就会创建一个新任务; 从那时起,任务在继续从其回溯堆栈中弹出直到耗尽时,在到达分支点时不会将任何其他点推到其堆栈,而是为每个这样的点创建一个新任务.因此,可以使用堆栈限制阈值来调整任务的大小.

我上面引用的数字("30,000个任务,平均700个线程")来自于搜索5000x5000个单元格的迷宫.所以,这是基本代码:

class SolverTask extends RecursiveTask<ArrayDeque<Point>> {
// Once the backtrack stack has reached this size, the current task
// will never add another cell to it, but create a new task for each
// newly discovered branch:
private static final int MAX_BACKTRACK_CELLS = 100*1000; …
Run Code Online (Sandbox Code Playgroud)

java parallel-processing fork-join threadpool

42
推荐指数
4
解决办法
2万
查看次数

Observable.forkJoin和数组参数

在Observables forkJoin文档中,它说args可以是一个数组,但它没有列出这样做的例子:

https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/forkjoin.md

我尝试过类似于我列出的功能(如下所示)但是想出了一个错误:

:3000/angular2/src/platform/browser/browser_adapter.js:76 

EXCEPTION: TypeError: Observable_1.Observable.forkJoin is not a function
Run Code Online (Sandbox Code Playgroud)

我的功能的剪切版本如下:

processStuff( inputObject ) {
  let _self = this;

  return new Observable(function(observer) {
    let observableBatch = [];

    inputObject.forEach(function(componentarray, key) {
      observableBatch.push(_self.http.get(key + '.json').map((res: Response) => res.json()));
    });

    Observable.forkJoin(
      observableBatch
    // );
    ).subscribe(() => {
      observer.next();
      observer.complete();
    });

  });
}
Run Code Online (Sandbox Code Playgroud)

我的问题的根与循环结束有关,然后按此处的要求继续:Angular2 Observable - 如何在循环之前等待循环中的所有函数调用结束?

但我还没有完全掌握forkJoin与数组的正确用法以及正确的语法.

我非常感谢您提供的帮助.

注意:返回可观察的第三功能示例

thirdFunction() {
  let _self = this;

  return Observable.create((observer) => {
  // return new Observable(function(observer) {
    ...

    observer.next(responseargs);
    observer.complete();
  });
}

processStuff(inputObject) …
Run Code Online (Sandbox Code Playgroud)

fork-join observable angular

39
推荐指数
1
解决办法
6万
查看次数

Java的fork-and-join线程池是否适合执行IO绑定任务?

在我的应用程序中,我必须通过执行许多network-io绑定任务来解决问题,有时一个io绑定任务并分成更小的io绑定任务.这些任务目前正在使用Java的标准线程池机制执行.我想知道我是否可以转向fork-and-join框架?但问题是,forkandjoin框架通常用于解决io绑定操作或CPU绑定吗?我假设它们主要用于CPU绑定操作,因为fork-and-join框架利用工作窃取技术来利用多核处理器,但如果我将它用于IO绑定任务,会不会有任何不利影响?

java fork-join

26
推荐指数
1
解决办法
4034
查看次数

Java ForkJoinPool具有非递归任务,是否可以正常工作?

我想Runnable通过一种方法将任务提交到ForkJoinPool:

forkJoinPool.submit(Runnable task)
Run Code Online (Sandbox Code Playgroud)

注意,我使用的是JDK 7.

在引擎盖下,它们被转换为ForkJoinTask对象.我知道当一个任务以递归方式分成较小的任务时,ForkJoinPool是有效的.

题:

如果没有递归,窃取工作仍然可以在ForkJoinPool中工作吗?

在这种情况下值得吗?

更新1: 任务很小,可能不平衡.即使对于严格相同的任务,诸如上下文切换,线程调度,停车,页面未命中等事情也会妨碍导致不平衡.

更新2: Doug Lea在Concurrency JSR-166兴趣小组中写道,给出了一个暗示:

当所有任务都是异步并提交到池而不是分叉时,这也极大地提高了吞吐量,这成为构造actor框架的合理方法,以及许多您可能使用ThreadPoolExecutor的普通服务.

我认为,当涉及到相当小的CPU绑定任务时,由于这种优化,ForkJoinPool是可行的方法.重点是这些任务已经很小,不需要递归分解.工作窃取工作,无论是大型还是小型任务 - 任务都可以被来自忙碌工人的Deque尾巴的另一个自由工作者抓住.

更新3: ForkJoinPool的可扩展性 - Akka乒乓球队的基准测试显示了很好的结果.

尽管如此,要更有效地应用ForkJoinPool需要进行性能调整.

java multithreading fork-join work-stealing forkjoinpool

25
推荐指数
1
解决办法
2512
查看次数

官方文档在哪里说Java的并行流操作使用fork/join?

以下是我对Java 8 的Stream框架的理解:

  1. 东西产生了源
  2. 该实现负责提供BaseStream#parallel()方法,该方法依次返回可以并行运行其操作的Stream.

虽然有人已经找到了一种方法来使用自定义线程池和Stream框架的并行执行,但我不能在Java 8 API中找到任何提及默认Java 8并行Stream实现将使用ForkJoinPool#commonPool()的内容.(Collection#parallelStream(),StreamSupport类中的方法,以及API中我不知道的其他可能的并行启用流源).

我只能搜索搜索结果的花絮是:


所以我的问题是:

在哪里说ForkJoinPool#commonPool()用于对从Java 8 API获得的流进行并行操作?

java fork-join java-8 forkjoinpool java-stream

24
推荐指数
2
解决办法
6295
查看次数