由于底层的ForkJoin实现,Java 8流可以使无限数据的O(1)内存减少成为O(n)内存

spl*_*spl 15 java memory java-8

我编写了一个流实现,它在文件的行上执行四个简单的缩减(+和<).

起初我执行了四个流,但我决定编写自己的累加器和组合器,以便我可以在一个流中执行所有四个缩减.在小型数据集(10,000,000行)上,运行时间按预期减少到大约1/4,在我的硬件上运行14秒.

fileIn = new BufferedReader(new InputStreamReader(
            new URL(args[0].trim()).openStream()));

final Results results = fileIn.lines()
        .parallel()
        .skip(1)
        .map(User::parse)
        .filter(Optional::isPresent)
        .map(Optional::get)
        .collect(Results::new, Results::accumulate, Results::combine);
Run Code Online (Sandbox Code Playgroud)

Results::accumulate并且分别Results::combine将用户正确地组合到结果和结果与结果中,并且此实现适用于小数据集.

我也试过使用.reduce(),结果很相似,但我试图.collect()减少短期对象的创建.

问题在于,当我使用具有10亿行的真实大小的数据时,我遇到的问题表明Java 8流无法完成任务.在JConsole中观察堆内存,以大致线性方式爬升到分配的12 GB,然后是OOM.

我的印象是收集器或减速器的性能可以与迭代解决方案相媲美,迭代解决方案应该受CPU和IO的限制而不是内存,因为减少步骤产生的结果不会增长,而是减少!

当我进行堆转储并将其放入jhat时,我看到大约7GB被字符串占用,并且这些字符串可以清楚地看作是输入文件的行.我觉得它们根本不应该在内存中,但是jhat显示了一个非常大的ForkJoin相关结构在内存中累积:

Static reference from java.util.concurrent.ForkJoinPool.common (from class java.util.concurrent.ForkJoinPool) :

--> java.util.concurrent.ForkJoinPool@0x786d41db0 (76 bytes) (field workQueues:)
--> [Ljava.util.concurrent.ForkJoinPool$WorkQueue;@0x786eda598 (144 bytes) (Element 3 of [Ljava.util.concurrent.ForkJoinPool$WorkQueue;@0x786eda598:)
--> java.util.concurrent.ForkJoinPool$WorkQueue@0x786d41ee8 (96 bytes) (field currentSteal:)
--> java.util.stream.SliceOps$SliceTask@0x7b4ac6cb0 (130 bytes) (field completer:)
--> java.util.stream.SliceOps$SliceTask@0x7b379ad18 (130 bytes) (field completer:)
--> java.util.stream.SliceOps$SliceTask@0x7b25bdb68 (130 bytes) (field leftChild:)
--> java.util.stream.SliceOps$SliceTask@0x7b379acb8 (130 bytes) (field localResult:)
--> java.util.stream.Nodes$SpinedNodeBuilder@0x7b25fdda0 (53 bytes) (field spine:)
--> [[Ljava.lang.Object;@0x7b25ffe48 (144 bytes) (Element 12 of [[Ljava.lang.Object;@0x7b25ffe48:)
--> [Ljava.lang.Object;@0x7b37c4f20 (262160 bytes) (Element 19598 of [Ljava.lang.Object;@0x7b37c4f20:)
--> 31ea87ba876505645342b31928394b3c,2013-11-24T23:02:17+00:00,898,22200,1314,700 (28 bytes) (field value:)
--> [C@0x7b2ffff88 (170 bytes) // <<<< There are thousands of these
Run Code Online (Sandbox Code Playgroud)

ApplicationShutdownHooks,Local references和System Classes中还有其他引用,但是我展示的这个问题是问题的关键,它会导致内存增长O(n)

流实现是否通过保存ForkJoin类中的所有字符串来使这个O(1)内存问题O(n)内存?我爱溪流,我不希望这样:(

spl*_*spl 1

感谢 Marko Topolnik 和 Holger 给出了正确答案。虽然都没有发布一个答案让我接受,所以我会尝试将其与子孙后代联系起来:)

在并行流上这.skip(1)是非常昂贵的,因为它需要排序以精确跳过第一个条目,根据Stream.skip() 的 Javadoc

在调用 BufferedReader 之前读取.lines()它的第一行确实成功地跳过了我的实现中的第一行。

然后删除.skip()解决了内存问题,并且在 JConsole 中观察到,即使程序处理 10 亿行,每次垃圾回收时也会很好地反弹并返回到 < 1GB。这是理想的行为,对于我的目的而言,它足够接近 O(1) 内存。

与上面的建议相反,.parallel()和的相对位置.skip(1)并不重要,您不能对它们重新排序以使其.skip(1)在“之前”发生.parallel().。构建器模式表明排序很重要,它适用于其他中间操作,但不适用于此一。我记得我的 OCP 认证材料中的这一微妙之处,但它似乎没有出现在 Javadoc 中,因此没有参考资料。不过,我通过进行孤立的更改并观察 JConsole 中的回归以及相关的 OOM,通过实验证实了这一点。