Java 8并行流如何在抛出的异常中运行?

Ali*_*aka 16 parallel-processing exception-handling exception java-8 java-stream

Java 8并行流如何在使用子句中的抛出异常上运行,例如在forEach处理中?例如,以下代码:

final AtomicBoolean throwException = new AtomicBoolean(true);
IntStream.range(0, 1000)
    .parallel()
    .forEach(i -> {
        // Throw only on one of the threads.
        if (throwException.compareAndSet(true, false)) {
            throw new RuntimeException("One of the tasks threw an exception. Index: " + i);
        });
Run Code Online (Sandbox Code Playgroud)

它会立即停止处理元素吗?是否等待已经启动的元素完成?是否等待所有流完成?抛出异常后是否开始处理流元素?

什么时候回来?异常后立即?毕竟/部分元素是由消费者处理的?

在并行流引发异常后,是否继续处理元素?(发现这种情况发生的情况).

这里有一般规则吗?

编辑(15-11-2016)

试图确定并行流是否提前返回,我发现它不是确定的:

@Test
public void testParallelStreamWithException() {
    AtomicInteger overallCount = new AtomicInteger(0);
    AtomicInteger afterExceptionCount = new AtomicInteger(0);
    AtomicBoolean throwException = new AtomicBoolean(true);

    try {
        IntStream.range(0, 1000)
            .parallel()
            .forEach(i -> {
                overallCount.incrementAndGet();
                afterExceptionCount.incrementAndGet();
                try {
                    System.out.println(i + " Sleeping...");
                    Thread.sleep(1000);
                    System.out.println(i + " After Sleeping.");
                }
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // Throw only on one of the threads and not on main thread.
                if (!Thread.currentThread().getName().equals("main") && throwException.compareAndSet(true, false)) {
                    System.out.println("Throwing exception - " + i);
                    throw new RuntimeException("One of the tasks threw an exception. Index: " + i);
                }
            });
        Assert.fail("Should not get here.");
    }
    catch (Exception e) {
        System.out.println("Cought Exception. Resetting the afterExceptionCount to zero - 0.");
        afterExceptionCount.set(0);
    }
    System.out.println("Overall count: " + overallCount.get());
    System.out.println("After exception count: " + afterExceptionCount.get());
}
Run Code Online (Sandbox Code Playgroud)

不从主线投掷时迟到返回.这导致在抛出异常后处理许多元素.在我的机器上,抛出异常后处理了大约200个元素.但是,并非所有1000个元素都得到了处理.那么这里的规则是什么?为什么即使抛出异常也会处理更多元素?

删除not(!)符号时提前返回,导致异常被抛出主线程.只有已经启动的元素已完成处理,并且没有处理新的元素.这里的情况很早就回来了.与以前的行为不一致.

我在这里错过了什么?

Eug*_*ene 8

当在其中一个阶段抛出异常时,它不等待其他操作完成,将异常重新抛出给调用者。这就是 ForkJoinPool 处理它的方式。

相比之下, findFirst 例如在并行运行时,只会在所有操作完成处理后才将结果呈现给调用者(即使在需要完成所有操作之前就知道结果)。

换句话说:它会提前返回,但会让所有正在运行的任务完成。

编辑以回答最后一条评论

Holger 的回答(评论中的链接)对此进行了大量解释,但这里有一些细节。

1)当杀死所有但主线程时,你也杀死了所有应该由这些线程处理的任务。所以这个数字应该实际上是更各地250有1000个任务,4个线程,我认为这将返回3?:

int result = ForkJoinPool.getCommonPoolParallelism();
Run Code Online (Sandbox Code Playgroud)

理论上有 1000 个任务,有 4 个线程,每个线程应该处理 250 个任务,然后你杀死其中的 3 个意味着丢失 750 个任务。还有 250 个任务要执行,ForkJoinPool 将跨越 3 个新线程来执行这 250 个剩下的任务。

你可以尝试一些事情,像这样改变你的流(使流没有大小):

IntStream.generate(random::nextInt).limit(1000).parallel().forEach
Run Code Online (Sandbox Code Playgroud)

这一次,将会有更多的操作结束,因为初始拆分索引是未知的并且由其他一些策略选择。你也可以尝试改变这一点:

 if (!Thread.currentThread().getName().equals("main") && throwException.compareAndSet(true, false)) {
Run Code Online (Sandbox Code Playgroud)

对此:

 if (!Thread.currentThread().getName().equals("main")) {
Run Code Online (Sandbox Code Playgroud)

这一次你总是会杀死除主线程之外的所有线程,直到某个时刻,由于任务太小而无法拆分,因此 ForkJoinPool 不会创建新线程,因此不需要其他线程。在这种情况下,完成的任务甚至更少。

2)你的第二个例子,当你真正杀死主线程时,就像代码一样,你不会看到其他线程的实际运行。更改 :

    } catch (Exception e) {
        System.out.println("Cought Exception. Resetting the afterExceptionCount to zero - 0.");
        afterExceptionCount.set(0);
    }

    // give some time for other threads to finish their work. You could play commenting and de-commenting this line to see a big difference in results. 
    TimeUnit.SECONDS.sleep(60);

    System.out.println("Overall count: " + overallCount.get());
    System.out.println("After exception count: " + afterExceptionCount.get());
Run Code Online (Sandbox Code Playgroud)

  • @AlikElzin-kilaka 不是真的,我认为这没有记录。我通过阅读其他一些引用此错误的 SO 问题来记住这一点:https://bugs.openjdk.java.net/browse/JDK-8164690 (3认同)
  • @AlikElzin-kilaka 在 core-libs-dev 邮件列表上还有 [this](http://mail.openjdk.java.net/pipermail/core-libs-dev/2016-August/042972.html) 讨论线程这导致了 Eugene 提到的 JBS 错误。 (2认同)
  • [这里](http://stackoverflow.com/questions/39261067/what-is-the-expected-behavior-when-a-java-8-stream-throw-a-runtimeexception/39275425#comment65883637_39261067)是一个较旧的参考. (2认同)