从并行流中捕获异常

fin*_*usl 7 java parallel-processing exception-handling java-8 java-stream

我有一堆列作为csv文件中的字符串数组.现在我想解析它们.由于这种解析需要日期解析和其他不那么快的解析技术,我正在考虑并行性(我计时,它需要一些时间).我的简单方法:

Stream.of(columns).parallel().forEach(column -> 
    result[column.index] = parseColumn(valueCache[column.index], column.type));
Run Code Online (Sandbox Code Playgroud)

列包含的ColumnDescriptor元素只有两个属性,即要解析的列索引和定义如何解析它的类型.没有其他的.result是一个Object数组,它接受结果数组.

问题是现在解析函数抛出ParseException,我进一步处理调用堆栈.既然我们在这里并行,它就不能被抛出.处理这个问题的最佳方法是什么?

我有这个解决方案,但我有点畏缩阅读它.什么是更好的方法呢?

final CompletableFuture<ParseException> thrownException = new CompletableFuture<>();
Stream.of(columns).parallel().forEach(column -> {
    try {
        result[column.index] = parseColumn(valueCache[column.index], column.type);
    } catch (ParseException e) {
        thrownException.complete(e);
    }});

if(thrownException.isDone())
    //only can be done if there is a value set.
    throw thrownException.getNow(null);
Run Code Online (Sandbox Code Playgroud)

注意:我不需要所有例外.如果我按顺序解析它们,我也只会得到一个.这样就可以了.

Hol*_*ger 6

问题是你错误的前提"因为我们在这里是并行的,它不能被抛出."没有规范禁止在并行处理中抛出异常.您可以像在顺序流中一样将该异常抛出到并行流中,将其包装在未经检查的异常中(如果它是已检查的异常).

如果线程中至少抛出一个异常,则forEach调用会将其(或其中一个)传播给调用者.

您可能遇到的唯一问题是,当前实现在遇到异常时不会等待所有线程的完成.这可以解决使用

try {
    Arrays.stream(columns).parallel()
        .forEach(column -> 
            result[column.index] = parseColumn(valueCache[column.index], column.type));
} catch(Throwable t) {
    ForkJoinPool.commonPool().awaitQuiescence(1, TimeUnit.MINUTES);
    throw t;
}
Run Code Online (Sandbox Code Playgroud)

但通常情况下,您不需要它,因为在特殊情况下您不会访问同时处理的结果.

  • @findusl:很有意思的是,他告诉您,在以后的一生中避免某事,只是因为某人某天对某事说得不好,所以听起来很教条。而且,您避免这种情况的尝试甚至在语义上和技术上都没有改变。parseColumn在多线程执行中仍会引发异常,有人会捕获该异常并将其移交给作业启动线程。为什么在手动执行而不是让Stream框架执行时会更好呢? (2认同)

joh*_*384 2

我认为问题更多的是,你在串行解析时通常会做什么?

你会在第一个异常时停止,并停止整个过程吗?在这种情况下,请将异常包装在运行时异常中,然后让流中止并抛出它。捕获包装器异常,解开它并处理它。

你会跳过不良记录吗?然后,要么 1. 跟踪列表中某处的错误,要么 2. 创建一个包装器对象,该对象可以保存解析结果或错误(不跟踪异常本身,只跟踪描述错误所需的最少异常)。

之后检查第一个选项的列表中是否有错误,或者以不同的方式显示第二个选项的有错误的记录。