我试图将多个由大量数据支持的流合并为一个,然后缓冲它们。我可以毫无问题地将这些流折叠成一个项目流。但是,当我尝试缓冲/分块流时,它会尝试完全缓冲第一个流,这会立即填满我的内存。
我花了一段时间将问题缩小到最小测试用例,但下面有一些代码。
我可以重构一些东西,这样我就不会遇到这个问题,但是在不理解为什么会爆炸的情况下,我觉得使用流只是一个定时炸弹。
我从Java 8 Streams 上的 Buffer Operator那里获得了缓冲的灵感。
import java.util.*;
import java.util.stream.LongStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
public class BreakStreams
{
//@see https://stackoverflow.com/questions/47842871/buffer-operator-on-java-8-streams
/**
* Batch a stream into chunks
*/
public static <T> Stream<List<T>> buffer(Stream<T> stream, final long count)
{
final Iterator<T> streamIterator = stream.iterator();
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(new Iterator<List<T>>()
{
@Override public boolean hasNext()
{
return streamIterator.hasNext();
}
@Override public List<T> next()
{
List<T> intermediate = new ArrayList<>();
for (long v = 0; v < count && hasNext(); v++)
{
intermediate.add(streamIterator.next());
}
return intermediate;
}
}, 0), false);
}
public static void main(String[] args)
{
//create streams from huge datasets
Stream<Long> streams = Stream.of(LongStream.range(0, Integer.MAX_VALUE).boxed(),
LongStream.range(0, Integer.MAX_VALUE).boxed())
//collapse into one stream
.flatMap(x -> x);
//iterating over the stream one item at a time is OK..
// streams.forEach(x -> {
//buffering the stream is NOT ok, you will go OOM
buffer(streams, 25).forEach(x -> {
try
{
Thread.sleep(2500);
}
catch (InterruptedException ignore)
{
}
System.out.println(x);
});
}
}
Run Code Online (Sandbox Code Playgroud)
这似乎与旧问题“为什么在 flatMap() 之后的 filter() 在 Java 流中“不完全”懒惰?”。虽然该问题已针对 Stream 的内置操作修复,但当我们尝试从外部迭代平面映射流时,它似乎仍然存在。
我们可以简化代码来重现问题
Stream.of(LongStream.range(0, Integer.MAX_VALUE))
.flatMapToLong(x -> x)
.iterator().hasNext();
Run Code Online (Sandbox Code Playgroud)
请注意,使用Spliterator也会受到影响
Stream.of(LongStream.range(0, Integer.MAX_VALUE))
.flatMapToLong(x -> x)
.spliterator()
.tryAdvance((long l) -> System.out.println("first item: "+l));
Run Code Online (Sandbox Code Playgroud)
两者都尝试缓冲元素,直到最终以OutOfMemoryError.
由于spliterator().forEachRemaining(…)似乎没有受到影响,您可以实现一个适用于您的用例的解决方案forEach,但它会很脆弱,因为它仍然会出现短路流操作的问题。
public static <T> Stream<List<T>> buffer(Stream<T> stream, final int count) {
boolean parallel = stream.isParallel();
Spliterator<T> source = stream.spliterator();
return StreamSupport.stream(
new Spliterators.AbstractSpliterator<List<T>>(
(source.estimateSize()+count-1)/count, source.characteristics()
&(Spliterator.SIZED|Spliterator.DISTINCT|Spliterator.ORDERED)
| Spliterator.NONNULL) {
List<T> list;
Consumer<T> c = t -> list.add(t);
@Override
public boolean tryAdvance(Consumer<? super List<T>> action) {
if(list == null) list = new ArrayList<>(count);
if(!source.tryAdvance(c)) return false;
do {} while(list.size() < count && source.tryAdvance(c));
action.accept(list);
list = null;
return true;
}
@Override
public void forEachRemaining(Consumer<? super List<T>> action) {
source.forEachRemaining(t -> {
if(list == null) list = new ArrayList<>(count);
list.add(t);
if(list.size() == count) {
action.accept(list);
list = null;
}
});
if(list != null) {
action.accept(list);
list = null;
}
}
}, parallel);
}
Run Code Online (Sandbox Code Playgroud)
但请注意,Spliterator基于解决方案通常更可取,因为它们支持携带支持优化的附加信息,并且在许多用例中具有较低的迭代成本。因此,一旦在 JDK 代码中修复了此问题,这就是要走的路。
作为一种解决方法,您可以使用Stream.concat(…)组合流,但它在其文档中明确警告不要一次组合太多流:
从重复串联构造流时要小心。访问深度级联流的元素可能会导致深度调用链,甚至
StackOverflowException[原文如此]。
throwable 的名称已StackOverflowError在 Java 9 的文档中更正为