Tho*_*hle 16 java out-of-memory lazy-evaluation java-stream
我试图理解为什么下面的 Java 程序给出了OutOfMemoryError,而没有的相应程序没有.parallel()。
System.out.println(Stream
.iterate(1, i -> i+1)
.parallel()
.flatMap(n -> Stream.iterate(n, i -> i+n))
.mapToInt(Integer::intValue)
.limit(100_000_000)
.sum()
);
Run Code Online (Sandbox Code Playgroud)
我有两个问题:
这个程序的预期输出是什么?
没有.parallel()它似乎只是输出sum(1+2+3+...),这意味着它只是“卡在” flatMap 中的第一个流,这是有道理的。
使用并行我不知道是否有预期的行为,但我的猜测是它以某种方式交错了第一个n左右的流,n并行工作人员的数量在哪里。根据分块/缓冲行为,它也可能略有不同。
是什么导致它耗尽内存?我特别想了解这些流是如何在幕后实现的。
我猜有什么东西阻塞了流,所以它永远不会完成并且能够摆脱生成的值,但我不太清楚事物的评估顺序以及缓冲发生的位置。
编辑:如果相关,我使用的是 Java 11。
编辑 2:显然即使对于简单的程序也会发生同样的事情IntStream.iterate(1,i->i+1).limit(1000_000_000).parallel().sum(),所以它可能与limit而不是的懒惰有关flatMap。
Hol*_*ger 11
你说“但我不太清楚事物的评估顺序和缓冲发生的位置”,这正是并行流的意义所在。评估顺序未指定。
您示例的一个关键方面是.limit(100_000_000). 这意味着该实现不能仅对任意值求和,而必须对前 100,000,000 个数字求和。请注意,在参考实现中,.unordered().limit(100_000_000)不会更改结果,这表明无序情况没有特殊实现,但这是一个实现细节。
现在,当工作线程处理元素时,它们不能只是对它们进行总结,因为它们必须知道允许使用哪些元素,这取决于在它们的特定工作负载之前有多少元素。由于此流不知道大小,因此只有在处理了前缀元素时才能知道这点,而对于无限流则不会发生这种情况。所以工作线程暂时保持缓冲,这些信息变得可用。
原则上,当工作线程知道它正在处理最左边的¹工作块时,它可以立即对元素求和,对它们进行计数,并在达到限制时发出结束信号。因此 Stream 可能会终止,但这取决于很多因素。
在您的情况下,一个可能的情况是其他工作线程分配缓冲区的速度比最左边的作业计数的速度要快。在这种情况下,对时间的细微更改可能会使流偶尔返回一个值。
当我们减慢除处理最左边块的工作线程之外的所有工作线程时,我们可以使流终止(至少在大多数运行中):
System.out.println(IntStream
.iterate(1, i -> i+1)
.parallel()
.peek(i -> { if(i != 1) LockSupport.parkNanos(1_000_000_000); })
.flatMap(n -> IntStream.iterate(n, i -> i+n))
.limit(100_000_000)
.sum()
);
Run Code Online (Sandbox Code Playgroud)
¹ 我遵循Stuart Marks 的建议,在谈论遭遇顺序而不是处理顺序时使用从左到右的顺序。
我最好的猜测是,将parallel()改变的内部行为flatMap(),其懒惰之前被评价已经出现了问题。
OutOfMemoryError在[JDK-8202307] Getting a java.lang.OutOfMemoryError: Java heap space when call Stream.iterator().next() 在 flatMap 中使用无限/非常大 Stream 的流中报告了您遇到的错误。如果您查看票证,它或多或少与您获得的堆栈跟踪相同。由于以下原因,该票证因无法修复而关闭:
在
iterator()与spliterator()方法“逃生舱”被使用时,它不是可以使用其它操作。它们有一些限制,因为它们将流实现的推模型转换为拉模型。在某些情况下,这种转换需要缓冲,例如当一个元素(平面)映射到两个或多个元素时。支持背压的概念来传达要通过元素生产的嵌套层拉动多少元素,这会使流实现显着复杂化,可能会以牺牲常见情况为代价。
| 归档时间: |
|
| 查看次数: |
1484 次 |
| 最近记录: |