相关疑难解决方法(0)

Java 8并行流中的自定义线程池

是否可以为Java 8 并行流指定自定义线程池?我找不到任何地方.

想象一下,我有一个服务器应用程序,我想使用并行流.但是应用程序很大且是多线程的,因此我想将它划分为区分.我不想在另一个模块的应用程序块任务的一个模块中执行缓慢的任务.

如果我不能为不同的模块使用不同的线程池,这意味着我无法在大多数现实情况下安全地使用并行流.

请尝试以下示例.在单独的线程中执行一些CPU密集型任务.这些任务利用并行流.第一个任务被破坏,因此每个步骤需要1秒(通过线程休眠模拟).问题是其他线程卡住并等待损坏的任务完成.这是一个人为的例子,但想象一下servlet应用程序和有人向共享fork连接池提交长时间运行的任务.

public class ParallelTest {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = Executors.newCachedThreadPool();

        es.execute(() -> runTask(1000)); //incorrect task
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));


        es.shutdown();
        es.awaitTermination(60, TimeUnit.SECONDS);
    }

    private static void runTask(int delay) {
        range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max()
                .ifPresent(max -> System.out.println(Thread.currentThread() + " " + max));
    }

    public static boolean isPrime(long n) {
        return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor …
Run Code Online (Sandbox Code Playgroud)

java parallel-processing concurrency java-8 java-stream

371
推荐指数
9
解决办法
15万
查看次数

官方文档在哪里说Java的并行流操作使用fork/join?

以下是我对Java 8 的Stream框架的理解:

  1. 东西产生了源
  2. 该实现负责提供BaseStream#parallel()方法,该方法依次返回可以并行运行其操作的Stream.

虽然有人已经找到了一种方法来使用自定义线程池和Stream框架的并行执行,但我不能在Java 8 API中找到任何提及默认Java 8并行Stream实现将使用ForkJoinPool#commonPool()的内容.(Collection#parallelStream(),StreamSupport类中的方法,以及API中我不知道的其他可能的并行启用流源).

我只能搜索搜索结果的花絮是:


所以我的问题是:

在哪里说ForkJoinPool#commonPool()用于对从Java 8 API获得的流进行并行操作?

java fork-join java-8 forkjoinpool java-stream

24
推荐指数
2
解决办法
6295
查看次数

嵌套的Java 8并行forEach循环表现不佳.这种行为有望吗?

注意:我已经在另一个SO帖子中解决了这个问题 - 在嵌套的Java 8并行流动作中使用信号量可能是DEADLOCK.这是一个错误吗? - 但是这篇文章的标题表明问题与使用信号量有关 - 这有点分散了讨论的注意力.我正在创建这个,以强调嵌套循环可能有性能问题 - 虽然这两个问题可能是一个共同的原因(也许是因为我花了很多时间来弄清楚这个问题).(我不认为它是重复的,因为它强调另一种症状 - 但如果你只是删除它).

问题:如果嵌套两个Java 8 stream.parallel().forEach循环并且所有任务都是独立的,无状态的等等 - 除了提交到公共FJ池 - 然后在并行循环内嵌套并行循环执行得更差而不是在并行循环内嵌套顺序循环.更糟糕的是:如果同步包含内循环的操作,您将获得DEADLOCK.

演示性能问题

如果没有"同步",您仍然可以观察到性能问题.您可以在以下网址找到演示代码:http://svn.finmath.net/finmath%20experiments/trunk/src/net/finmath/experiments/concurrency/NestedParallelForEachTest.java (有关更详细的说明,请参阅JavaDoc).

我们的设置如下:我们有一个嵌套的stream.parallel().forEach().

  • 内环是独立的(无状态,无干扰等 - 除了使用公共池之外)并且在最坏的情况下总共消耗1秒,即如果处理顺序.
  • 外循环的一半任务在该循环之前消耗10秒.
  • 在该循环之后,一半消耗10秒.
  • 因此,每个线程总共消耗11秒(最坏情况).*我们有一个布尔值,允许将内部循环从parallel()切换到sequential().

现在:将24个外循环任务提交给具有并行性的池8我们期望24/8*11 =最多33秒(在8核或更好的机器上).

结果是:

  • 内部顺序循环:33秒.
  • 内部并行循环:> 80秒(我有92秒).

问题:你能证实这种行为吗?这是人们对框架的期望吗?(我现在更加小心,声称这是一个错误,但我个人认为这是由于ForkJoinTask的实现中的一个错误.备注:我已将此发布到并发兴趣(请参阅http:// cs.oswego.edu/pipermail/concurrency-interest/2014-May/012652.html),但到目前为止我没有得到确认).

证明了僵局

以下代码将为DEADLOCK

    // Outer loop
    IntStream.range(0,numberOfTasksInOuterLoop).parallel().forEach(i -> {
        doWork();
        synchronized(this) {
            // Inner loop
            IntStream.range(0,numberOfTasksInInnerLoop).parallel().forEach(j -> {
                doWork();
            });
        }
    });
Run Code Online (Sandbox Code Playgroud)

其中numberOfTasksInOuterLoop = 24,numberOfTasksInInnerLoop = 240,outerLoopOverheadFactor = 10000和 …

java parallel-processing concurrency java-8 java-stream

19
推荐指数
1
解决办法
7911
查看次数

ForkJoinPool.commonPool()相当于没有池吗?

我已经确定使用并行流确实比我的数据集的串行流更快.话虽如此,我想知道在这个问题中讨论使用的ForkJoinPool:Java 8并行流中的自定义线程池.

鉴于,

void foo()
{
     barCollection.parallelStream()  … do something with the stream
}
Run Code Online (Sandbox Code Playgroud)

相对于哪个池将使用1和2以下?

1)

ForkJoinPool.commonPool().submit(()->foo()).get();
Run Code Online (Sandbox Code Playgroud)

2)

foo();
Run Code Online (Sandbox Code Playgroud)

如果答案是肯定的,那么为什么该ForkJoinPol.commonPool()方法存在?

java multithreading java-8

10
推荐指数
1
解决办法
3586
查看次数