相关疑难解决方法(0)

Java 8并行流中的自定义线程池

是否可以为Java 8 并行流指定自定义线程池?我找不到任何地方.

想象一下,我有一个服务器应用程序,我想使用并行流.但是应用程序很大且是多线程的,因此我想将它划分为区分.我不想在另一个模块的应用程序块任务的一个模块中执行缓慢的任务.

如果我不能为不同的模块使用不同的线程池,这意味着我无法在大多数现实情况下安全地使用并行流.

请尝试以下示例.在单独的线程中执行一些CPU密集型任务.这些任务利用并行流.第一个任务被破坏,因此每个步骤需要1秒(通过线程休眠模拟).问题是其他线程卡住并等待损坏的任务完成.这是一个人为的例子,但想象一下servlet应用程序和有人向共享fork连接池提交长时间运行的任务.

public class ParallelTest {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = Executors.newCachedThreadPool();

        es.execute(() -> runTask(1000)); //incorrect task
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));


        es.shutdown();
        es.awaitTermination(60, TimeUnit.SECONDS);
    }

    private static void runTask(int delay) {
        range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max()
                .ifPresent(max -> System.out.println(Thread.currentThread() + " " + max));
    }

    public static boolean isPrime(long n) {
        return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor …
Run Code Online (Sandbox Code Playgroud)

java parallel-processing concurrency java-8 java-stream

371
推荐指数
9
解决办法
15万
查看次数

在嵌套的Java 8并行流动作中使用信号量可能是DEADLOCK.这是一个错误吗?

考虑以下情况:我们使用Java 8并行流来执行并行forEach循环,例如,

IntStream.range(0,20).parallel().forEach(i -> { /* work done here */})
Run Code Online (Sandbox Code Playgroud)

并行线程的数量由系统属性"java.util.concurrent.ForkJoinPool.common.parallelism"控制,通常等于处理器的数量.

现在假设我们想限制特定工作的并行执行次数 - 例如因为该部分是内存密集型而内存约束意味着并行执行的限制.

限制并行执行的一种明显而优雅的方法是使用信号量(这里建议),例如,下面的代码片段将并行执行的数量限制为5:

        final Semaphore concurrentExecutions = new Semaphore(5);
        IntStream.range(0,20).parallel().forEach(i -> {

            concurrentExecutions.acquireUninterruptibly();

            try {
                /* WORK DONE HERE */
            }
            finally {
                concurrentExecutions.release();
            }
        });
Run Code Online (Sandbox Code Playgroud)

这很好用!

但是:在worker(at /* WORK DONE HERE */)中使用任何其他并行流可能会导致死锁.

对我来说,这是一个意外的行为.

说明:由于Java流使用ForkJoin池,因此内部forEach正在分叉,并且连接似乎正在等待.但是,这种行为仍然是出乎意料的.请注意,如果设置"java.util.concurrent.ForkJoinPool.common.parallelism"为1 ,并行流甚至可以工作.

另请注意,如果存在内部并行forEach,则它可能不透明.

问题: 这种行为是否符合Java 8规范(在这种情况下,它意味着禁止在并行流工作者中使用信号量)或者这是一个错误?

为方便起见:下面是一个完整的测试用例.除了"true,true"之外,两个布尔值的任何组合都有效,这会导致死锁.

澄清:为了明确这一点,让我强调一个方面:acquire信号量不会发生死锁.请注意,代码包含

  1. 获得信号量
  2. 运行一些代码
  3. 释放信号量

如果该段代码使用另一个并行流,则死锁发生在2. 然后在OTHER流内发生死锁.因此,似乎不允许一起使用嵌套并行流和阻塞操作(如信号量)!

请注意,记录并行流使用ForkJoinPool并且ForkJoinPool和Semaphore属于同一个包 - java.util.concurrent(因此可以预期它们可以很好地互操作).

/*
 * (c) Copyright Christian P. Fries, …
Run Code Online (Sandbox Code Playgroud)

java parallel-processing concurrency java-8 java-stream

21
推荐指数
2
解决办法
5721
查看次数

嵌套的Java 8并行forEach循环表现不佳.这种行为有望吗?

注意:我已经在另一个SO帖子中解决了这个问题 - 在嵌套的Java 8并行流动作中使用信号量可能是DEADLOCK.这是一个错误吗? - 但是这篇文章的标题表明问题与使用信号量有关 - 这有点分散了讨论的注意力.我正在创建这个,以强调嵌套循环可能有性能问题 - 虽然这两个问题可能是一个共同的原因(也许是因为我花了很多时间来弄清楚这个问题).(我不认为它是重复的,因为它强调另一种症状 - 但如果你只是删除它).

问题:如果嵌套两个Java 8 stream.parallel().forEach循环并且所有任务都是独立的,无状态的等等 - 除了提交到公共FJ池 - 然后在并行循环内嵌套并行循环执行得更差而不是在并行循环内嵌套顺序循环.更糟糕的是:如果同步包含内循环的操作,您将获得DEADLOCK.

演示性能问题

如果没有"同步",您仍然可以观察到性能问题.您可以在以下网址找到演示代码:http://svn.finmath.net/finmath%20experiments/trunk/src/net/finmath/experiments/concurrency/NestedParallelForEachTest.java (有关更详细的说明,请参阅JavaDoc).

我们的设置如下:我们有一个嵌套的stream.parallel().forEach().

  • 内环是独立的(无状态,无干扰等 - 除了使用公共池之外)并且在最坏的情况下总共消耗1秒,即如果处理顺序.
  • 外循环的一半任务在该循环之前消耗10秒.
  • 在该循环之后,一半消耗10秒.
  • 因此,每个线程总共消耗11秒(最坏情况).*我们有一个布尔值,允许将内部循环从parallel()切换到sequential().

现在:将24个外循环任务提交给具有并行性的池8我们期望24/8*11 =最多33秒(在8核或更好的机器上).

结果是:

  • 内部顺序循环:33秒.
  • 内部并行循环:> 80秒(我有92秒).

问题:你能证实这种行为吗?这是人们对框架的期望吗?(我现在更加小心,声称这是一个错误,但我个人认为这是由于ForkJoinTask的实现中的一个错误.备注:我已将此发布到并发兴趣(请参阅http:// cs.oswego.edu/pipermail/concurrency-interest/2014-May/012652.html),但到目前为止我没有得到确认).

证明了僵局

以下代码将为DEADLOCK

    // Outer loop
    IntStream.range(0,numberOfTasksInOuterLoop).parallel().forEach(i -> {
        doWork();
        synchronized(this) {
            // Inner loop
            IntStream.range(0,numberOfTasksInInnerLoop).parallel().forEach(j -> {
                doWork();
            });
        }
    });
Run Code Online (Sandbox Code Playgroud)

其中numberOfTasksInOuterLoop = 24,numberOfTasksInInnerLoop = 240,outerLoopOverheadFactor = 10000和 …

java parallel-processing concurrency java-8 java-stream

19
推荐指数
1
解决办法
7911
查看次数

如何(全局)替换Java并行流的公共线程池后端?

我想全局替换Java并行流默认使用的公共线程池,例如for

IntStream.range(0,100).parallel().forEach(i -> {
    doWork();
});
Run Code Online (Sandbox Code Playgroud)

我知道可以通过向专用线程池提交此类指令来使用专用的ForkJoinPool(请参阅Java 8并行流中的自定义线程池).这里的问题是

  • 是否有可能通过其他实现替换常见的ForkJoinPool(比如说Executors.newFixedThreadPool(10)
  • 是否可以通过某些全局设置来实现,例如,某些JVM属性?

备注:我之所以要更换F/J池,是因为它似乎有一个错误,使其无法用于嵌套并行循环.

嵌套并行循环的性能很差,可能导致死锁,请参阅http://christian-fries.de/blog/files/2014-nested-java-8-parallel-foreach.html

例如:以下代码导致死锁:

// Outer loop
IntStream.range(0,24).parallel().forEach(i -> {

    // (omitted:) do some heavy work here (consuming majority of time)

    // Need to synchronize for a small "subtask" (e.g. updating a result)
    synchronized(this) {
        // Inner loop (does s.th. completely free of side-effects, i.e. expected to work)
        IntStream.range(0,100).parallel().forEach(j -> {
            // do work here
        });
    }
});
Run Code Online (Sandbox Code Playgroud)

(即使在"在这里工作"没有任何额外的代码,因为并行性设置为<12).

我的问题是如何更换FJP.如果您想讨论嵌套并行循环,您可能会检查嵌套Java 8并行forEach循环执行不佳.这种行为有望吗?.

java parallel-processing concurrency multithreading stream

7
推荐指数
1
解决办法
1717
查看次数

Collection.parallelStream()是否意味着发生在之前的关系?

考虑这个(完全人为的)Java代码:

final List<Integer> s = Arrays.asList(1, 2, 3);
final int[] a = new int[1];
a[0] = 100;
s.parallelStream().forEach(i -> {
    synchronized (a) {
        a[0] += i;
    }
});
System.out.println(a[0]);
Run Code Online (Sandbox Code Playgroud)

此代码是否保证输出"106"?

似乎不存在,除非有一个先发生过的关系建立起来parallelStream(),通过它我们可以肯定地知道a[0]lambda 中的第一次访问将看到100而不是零(根据我对Java内存模型的理解).

Collection.parallelStream()没有记录建立这种关系......

可以询问完成parallelStream()方法调用的相同问题.

所以我错过了一些东西,或者为了正确性,上述代码需要看起来像这样:

final List<Integer> s = Arrays.asList(1, 2, 3);
final int[] a = new int[1];
synchronized (a) {
    a[0] = 100;
}
s.parallelStream().forEach(i -> {
    synchronized (a) {
        a[0] += i;
    }
});
synchronized (a) …
Run Code Online (Sandbox Code Playgroud)

java java-memory-model java-stream

6
推荐指数
1
解决办法
160
查看次数

限制并行流中并发评估数量(如固定线程池)的最佳/最优雅的方法是什么

假设 lambda 表达式消耗一定量的资源(例如内存),该资源是有限的,需要限制并发执行的数量(例如:如果 lambda 暂时消耗 100 MB(本地内存),我们希望将其限制为 1GB ,我们不允许超过 10 个并发评估)。

限制并发执行数量的最佳方法是什么,例如

IntStream.range(0, numberOfJobs).parallel().foreach( i -> { /*...*/ });
Run Code Online (Sandbox Code Playgroud)

注意:一个明显的选择是执行嵌套

    double jobsPerThread = (double)numberOfJobs / numberOfThreads;
    IntStream.range(0, numberOfThreads).parallel().forEach( threadIndex ->
        IntStream.range((int)(threadIndex * jobsPerThread), (int)((threadIndex+1) * jobsPerThread)).sequential().forEach( i -> { /*...*/ }));
Run Code Online (Sandbox Code Playgroud)

这是唯一的方法吗?Tt 没那么优雅。其实我想要一个

IntStream.range(0, numberOfJobs).parallel(numberOfThreads).foreach( i -> { /*...*/ });
Run Code Online (Sandbox Code Playgroud)

java concurrency java-8

5
推荐指数
1
解决办法
2745
查看次数