Java Streams - 缓冲巨大的流

Mic*_*tin 5 java java-stream

我试图将多个由大量数据支持的流合并为一个,然后缓冲它们。我可以毫无问题地将这些流折叠成一个项目流。但是,当我尝试缓冲/分块流时,它会尝试完全缓冲第一个流,这会立即填满我的内存。

我花了一段时间将问题缩小到最小测试用例,但下面有一些代码。

我可以重构一些东西,这样我就不会遇到这个问题,但是在不理解为什么会爆炸的情况下,我觉得使用流只是一个定时炸弹。

我从Java 8 Streams 上的 Buffer Operator那里获得了缓冲的灵感。

import java.util.*;
import java.util.stream.LongStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

public class BreakStreams
{

   //@see https://stackoverflow.com/questions/47842871/buffer-operator-on-java-8-streams
   /**
    * Batch a stream into chunks
    */
   public static <T> Stream<List<T>> buffer(Stream<T> stream, final long count)
   {
      final Iterator<T> streamIterator = stream.iterator();

      return StreamSupport.stream(Spliterators.spliteratorUnknownSize(new Iterator<List<T>>()
      {
         @Override public boolean hasNext()
         {
            return streamIterator.hasNext();
         }

         @Override public List<T> next()
         {
            List<T> intermediate = new ArrayList<>();
            for (long v = 0; v < count && hasNext(); v++)
            {
               intermediate.add(streamIterator.next());
            }
            return intermediate;
         }
      }, 0), false);
   }

   public static void main(String[] args)
   {

      //create streams from huge datasets
      Stream<Long> streams = Stream.of(LongStream.range(0, Integer.MAX_VALUE).boxed(),
                                       LongStream.range(0, Integer.MAX_VALUE).boxed())
                                   //collapse into one stream
                                   .flatMap(x -> x);
      //iterating over the stream one item at a time is OK..
//      streams.forEach(x -> {

      //buffering the stream is NOT ok, you will go OOM
      buffer(streams, 25).forEach(x -> {
         try
         {
            Thread.sleep(2500);
         }
         catch (InterruptedException ignore)
         {
         }
         System.out.println(x);
      });
   }
}

Run Code Online (Sandbox Code Playgroud)

Hol*_*ger 6

这似乎与旧问题“为什么在 flatMap() 之后的 filter() 在 Java 流中“不完全”懒惰?”。虽然该问题已针对 Stream 的内置操作修复,但当我们尝试从外部迭代平面映射流时,它似乎仍然存在。

我们可以简化代码来重现问题

Stream.of(LongStream.range(0, Integer.MAX_VALUE))
    .flatMapToLong(x -> x)
    .iterator().hasNext();
Run Code Online (Sandbox Code Playgroud)

请注意,使用Spliterator也会受到影响

Stream.of(LongStream.range(0, Integer.MAX_VALUE))
    .flatMapToLong(x -> x)
    .spliterator()
    .tryAdvance((long l) -> System.out.println("first item: "+l));
Run Code Online (Sandbox Code Playgroud)

两者都尝试缓冲元素,直到最终以OutOfMemoryError.

由于spliterator().forEachRemaining(…)似乎没有受到影响,您可以实现一个适用于您的用例的解决方案forEach,但它会很脆弱,因为它仍然会出现短路流操作的问题。

public static <T> Stream<List<T>> buffer(Stream<T> stream, final int count) {
    boolean parallel = stream.isParallel();
    Spliterator<T> source = stream.spliterator();
    return StreamSupport.stream(
        new Spliterators.AbstractSpliterator<List<T>>(
            (source.estimateSize()+count-1)/count, source.characteristics()
                &(Spliterator.SIZED|Spliterator.DISTINCT|Spliterator.ORDERED)
                    | Spliterator.NONNULL) {
            List<T> list;
            Consumer<T> c = t -> list.add(t);
            @Override
            public boolean tryAdvance(Consumer<? super List<T>> action) {
                if(list == null) list = new ArrayList<>(count);
                if(!source.tryAdvance(c)) return false;
                do {} while(list.size() < count && source.tryAdvance(c));
                action.accept(list);
                list = null;
                return true;
            }
            @Override
            public void forEachRemaining(Consumer<? super List<T>> action) {
                source.forEachRemaining(t -> {
                    if(list == null) list = new ArrayList<>(count);
                    list.add(t);
                    if(list.size() == count) {
                        action.accept(list);
                        list = null;
                    }
                });
                if(list != null) {
                    action.accept(list);
                    list = null;
                }
            }
        }, parallel);
}
Run Code Online (Sandbox Code Playgroud)

但请注意,Spliterator基于解决方案通常更可取,因为它们支持携带支持优化的附加信息,并且在许多用例中具有较低的迭代成本。因此,一旦在 JDK 代码中修复了此问题,这就是要走的路。

作为一种解决方法,您可以使用Stream.concat(…)组合流,但它在其文档中明确警告不要一次组合太多流:

从重复串联构造流时要小心。访问深度级联流的元素可能会导致深度调用链,甚至StackOverflowException[原文如此]。

throwable 的名称已StackOverflowError在 Java 9 的文档中更正为