fge*_*fge 34 java parallel-processing java-8 java-stream
环境:Ubuntu x86_64(14.10),Oracle JDK 1.8u25
我尝试使用并行流Files.lines()但我想要.skip()第一行(它是带有标题的CSV文件).所以我试着这样做:
try (
final Stream<String> stream = Files.lines(thePath, StandardCharsets.UTF_8)
.skip(1L).parallel();
) {
// etc
}
Run Code Online (Sandbox Code Playgroud)
但是后来一列未能解析成一个int ...
所以我尝试了一些简单的代码.文件问题很简单:
$ cat info.csv
startDate;treeDepth;nrMatchers;nrLines;nrChars;nrCodePoints;nrNodes
1422758875023;34;54;151;4375;4375;27486
$
Run Code Online (Sandbox Code Playgroud)
代码同样简单:
public static void main(final String... args)
{
final Path path = Paths.get("/home/fge/tmp/dd/info.csv");
Files.lines(path, StandardCharsets.UTF_8).skip(1L).parallel()
.forEach(System.out::println);
}
Run Code Online (Sandbox Code Playgroud)
我系统地得到以下结果(好吧,我只运行了大约20次):
startDate;treeDepth;nrMatchers;nrLines;nrChars;nrCodePoints;nrNodes
Run Code Online (Sandbox Code Playgroud)
我在这里错过了什么?
编辑似乎问题或误解比这更根深蒂固(下面的两个例子是由FreeNode的## java编写的):
public static void main(final String... args)
{
new BufferedReader(new StringReader("Hello\nWorld")).lines()
.skip(1L).parallel()
.forEach(System.out::println);
final Iterator<String> iter
= Arrays.asList("Hello", "World").iterator();
final Spliterator<String> spliterator
= Spliterators.spliteratorUnknownSize(iter, Spliterator.ORDERED);
final Stream<String> s
= StreamSupport.stream(spliterator, true);
s.skip(1L).forEach(System.out::println);
}
Run Code Online (Sandbox Code Playgroud)
这打印:
Hello
Hello
Run Code Online (Sandbox Code Playgroud)
呃.
@Holger建议对任何流ORDERED而不是SIZED这个其他样本的流发生这种情况:
Stream.of("Hello", "World")
.filter(x -> true)
.parallel()
.skip(1L)
.forEach(System.out::println);
Run Code Online (Sandbox Code Playgroud)
此外,它源于已经发生的所有问题(如果它是一个?)的讨论.forEach()(正如@SotiriosDelimanolis首先指出的那样).
Nic*_*lai 18
这个答案已经过时了 - 请阅读这一个!
快速回答问题:观察到的行为是有意的!根据文档,没有错误,一切都在发生.但是,可以说,这种行为应该被记录下来并更好地传达.应该更明显地forEach忽略排序.
我将首先介绍允许观察到的行为的概念.这为解析问题中给出的一个例子提供了背景.我会在较高的水平上进行此操作,然后再在非常低的水平上进行.
[TL; DR:自己阅读,高级解释会给出一个粗略的答案.]
我们不谈论Streams,它是由流相关方法操作或返回的类型,而是讨论流操作和流管道.该方法调用lines,skip并且parallel是流操作,它构建流管道[1],并且 - 正如其他人已经注意到的那样 - 当forEach调用终端操作时,管道作为一个整体被处理[2].
管道可以被认为是一系列操作,一个接一个地在整个流上执行(例如,过滤所有元素,将剩余元素映射到数字,对所有数字求和).但这是误导!更好的比喻是终端操作通过每个操作拉出单个元素[3](例如,获取下一个未过滤的元素,映射它,将其添加到sum,请求下一个元素).一些中间操作可能需要在它们可以返回所请求的下一个元素之前遍历若干(例如skip)或甚至所有(例如sort)元素,并且这是操作中的状态源之一.
每个操作用这些信号表示其特征StreamOpFlag:
DISTINCTSORTEDORDEREDSIZEDSHORT_CIRCUIT它们在流源,中间操作和终端操作中组合在一起,构成管道的特征(作为一个整体),然后用于优化[4].类似地,管道是否并行执行是整个管道的属性[5].
因此,无论何时对这些特征做出假设,您都必须仔细查看构建管道的所有操作,无论它们的应用顺序如何,以及它们的保证.这样做时,请记住终端操作如何通过管道拉动每个单独的元素.
让我们来看看这个特例:
BufferedReader fooBarReader = new BufferedReader(new StringReader("Foo\nBar"));
fooBarReader.lines()
.skip(1L)
.parallel()
.forEach(System.out::println);
Run Code Online (Sandbox Code Playgroud)
无论您的流源是否有序(它是),通过调用forEach(而不是forEachOrdered)您声明该顺序对您无关 [6],这有效地skip从"跳过前n个元素"减少到"跳过任何n个元素"[7](因为没有顺序,前者变得毫无意义).
所以如果承诺加速,你就给管道权限忽略顺序.对于并行执行,它显然是这么认为的,这就是为什么你得到观察到的输出.因此,您观察到的是预期的行为,没有错误.
请注意,这与有状态不冲突skip!如上所述,有状态并不意味着它以某种方式缓存整个流(减去跳过的元素),并且随后的所有内容都在这些元素上执行.它只是意味着操作有一些状态 - 即跳过的元素的数量(好吧,它实际上并不那么容易,但由于我对正在发生的事情的理解有限,我认为它是一个公平的简化).
让我们更详细地看一下:
BufferedReader.lines创建Stream,让我们称之为_lines:
SpliteratorStreamSupport.stream,它创建一个ReferencePipeline.Head并将spliterator标志转换为流op标志.skip创建一个新的Stream,让我们称之为_skip:
ReferencePipeline.skipSliceOps.makeRefReferencePipeline.StatefulOp,该实例_lines作为其源引用.parallel 如上所述,为整个管道设置并行标志.forEach 实际上开始执行那么让我们看看管道是如何执行的:
_skip.forEach 创建一个ForEachOp(让它调用它_forEach)并交给它_skip.evaluate,它做两件事:
sourceSpliterator在此管道阶段的源周围创建一个spliterator:
opEvaluateParallelLazy自己(事实证明)UnorderedSliceSpliterator(让我们称之为_sliceSpliterator)skip = 1,但没有限制._forEach.evaluateParallel创建一个ForEachTask(因为它是无序的;让我们调用它_forEachTask)并调用它_forEachTask.compute任务中拆分前1024行,为它创建一个新任务(让我们称之为_forEachTask2),意识到没有行和完成._forEachTask2.compute被调用,徒劳地尝试再次拆分,最后System.out.println通过调用开始将其元素复制到接收器(一个流感知的包装器)_skip.copyInto._sliceSpliterator上面创建的!因此_sliceSpliterator.forEachRemaining负责将未跳过的元素处理到println-sink:
acquirePermits所以UnorderedSliceSpliterator.OfRef.forEachRemaining订单最终真正被忽略了.我没有将它与有序变体进行比较,但这是我为什么这样做的假设:
任何问题?;)抱歉这么久了.也许我应该省略细节并撰写博客文章....
流操作分为中间操作和终端操作,并组合成流管道.
在执行管道的终端操作之前,不会开始遍历管道源.
[3]这个比喻代表了我对溪流的理解.代码旁边的主要来源是java.util.stream- Stream操作和管道(突出显示我的):
懒惰地处理流可以显着提高效率; 在诸如上面的filter-map-sum示例的流水线中,过滤,映射和求和可以融合到数据的单个传递中,具有最小的中间状态.懒惰还允许在不必要时避免检查所有数据; 对于诸如"查找超过1000个字符的第一个字符串"之类的操作,只需要检查足够的字符串以找到具有所需特征的字符串,而不检查源中可用的所有字符串.
[4] java.util.stream.StreamOpFlag:
在流水线的每个阶段,可以计算组合的流和操作标志[... jadda,jadda,jadda,关于如何在源,中间和终端操作之间组合标志 ...]以产生从流水线输出的标志.然后可以使用这些标志来应用优化.
在代码中,您可以看到此内容AbstractPipeline.combinedFlags,它是在构造期间(以及在其他一些事件中)通过组合前一个操作和新操作的标志来设置的.
[5] java.util.stream- 并行性(我无法直接链接 - 向下滚动一点):
当启动终端操作时,根据调用它的流的方向,顺序地或并行地执行流管道.
在代码中,您可以看到这是在AbstractPipeline.sequential,parallel和isParallel,它在流源上设置/检查布尔标志,使得在构造流时调用setter时无关紧要.
[6] java.util.stream.Stream.forEach:
对此流的每个元素执行操作.[...]此操作的行为明确是不确定的.
与java.util.stream.Stream.forEachOrdered对比:
如果流具有已定义的遭遇顺序,则按流的遭遇顺序对此流的每个元素执行操作.
[7]这也没有明确记载,但我对此评论的解释Stream.skip(由我严重缩短):
[...] skip()[...]在有序并行管道上可能相当昂贵[...]因为skip(n)被约束为不仅跳过任何n个元素,而是跳过遭遇顺序中的前n个元素.[...] [R]提供排序约束[...]可能会导致并行管道中skip()的显着加速
Hol*_*ger 17
由于问题的当前状态与此处所做的早期陈述完全相反,应该注意的是,Brian Goetz现在有一个关于无序特征反向传播的明确陈述skip被认为是一个错误.它还表示现在认为它根本没有对终端操作的有序性进行反向传播.
还有一个相关的错误报告,JDK-8129120,其状态是"在Java 9中修复",并且它被反向移植到Java 8,更新60
我做了一些测试,jdk1.8.0_60似乎现在的实现确实表现出更直观的行为.
问题是您正在使用并行流和forEach,并且您期望跳过操作依赖于正确的元素顺序,而这不是这里的情况.摘自forEach文档:
对于并行流管道,此操作不保证遵守流的遭遇顺序,因为这样做会牺牲并行性的好处.
我猜基本上发生的事情是跳过操作首先在第二行执行,而不是在第一行执行.如果你使用流顺序或使用forEachOrdered,你可以看到它然后产生预期的结果.另一种方法是使用收集器.
| 归档时间: |
|
| 查看次数: |
2842 次 |
| 最近记录: |