分区Java 8流

Tra*_*001 55 java functional-programming java-8 java-stream

如何在Java 8 Stream上实现"分区"操作?通过分区,我的意思是,将流划分为给定大小的子流.不知何故,它将与Guava Iterators.partition()方法完全相同,只是希望分区是懒惰评估的Streams而不是List的.

Tag*_*eev 40

将任意源流分区为固定大小的批次是不可能的,因为这会搞砸并行处理.并行处理时,您可能不知道拆分后第一个子任务中有多少元素,因此在第一个子任务完全处理之前,您无法为下一个子任务创建分区.

但是,可以从随机访问创建分区流List.例如,在我的StreamEx库中可以使用这样的功能:

List<Type> input = Arrays.asList(...);

Stream<List<Type>> stream = StreamEx.ofSubLists(input, partitionSize);
Run Code Online (Sandbox Code Playgroud)

或者如果你真的想要流的流:

Stream<Stream<Type>> stream = StreamEx.ofSubLists(input, partitionSize).map(List::stream);
Run Code Online (Sandbox Code Playgroud)

如果您不想依赖第三方库,可以ofSubLists手动实现此类方法:

public static <T> Stream<List<T>> ofSubLists(List<T> source, int length) {
    if (length <= 0)
        throw new IllegalArgumentException("length = " + length);
    int size = source.size();
    if (size <= 0)
        return Stream.empty();
    int fullChunks = (size - 1) / length;
    return IntStream.range(0, fullChunks + 1).mapToObj(
        n -> source.subList(n * length, n == fullChunks ? size : (n + 1) * length));
}
Run Code Online (Sandbox Code Playgroud)

这个实现看起来有点长,但它考虑了一些极端情况,例如close-to-MAX_VALUE列表大小.


如果你想要无序流的并行友好解决方案(所以你不关心哪个流元素将在单个批处理中组合),你可以像这样使用收集器(感谢@sibnick的灵感):

public static <T, A, R> Collector<T, ?, R> unorderedBatches(int batchSize, 
                   Collector<List<T>, A, R> downstream) {
    class Acc {
        List<T> cur = new ArrayList<>();
        A acc = downstream.supplier().get();
    }
    BiConsumer<Acc, T> accumulator = (acc, t) -> {
        acc.cur.add(t);
        if(acc.cur.size() == batchSize) {
            downstream.accumulator().accept(acc.acc, acc.cur);
            acc.cur = new ArrayList<>();
        }
    };
    return Collector.of(Acc::new, accumulator,
            (acc1, acc2) -> {
                acc1.acc = downstream.combiner().apply(acc1.acc, acc2.acc);
                for(T t : acc2.cur) accumulator.accept(acc1, t);
                return acc1;
            }, acc -> {
                if(!acc.cur.isEmpty())
                    downstream.accumulator().accept(acc.acc, acc.cur);
                return downstream.finisher().apply(acc.acc);
            }, Collector.Characteristics.UNORDERED);
}
Run Code Online (Sandbox Code Playgroud)

用法示例:

List<List<Integer>> list = IntStream.range(0,20)
                                    .boxed().parallel()
                                    .collect(unorderedBatches(3, Collectors.toList()));
Run Code Online (Sandbox Code Playgroud)

结果:

[[2, 3, 4], [7, 8, 9], [0, 1, 5], [12, 13, 14], [17, 18, 19], [10, 11, 15], [6, 16]]
Run Code Online (Sandbox Code Playgroud)

这种收集器是完全线程安全的,并为顺序流生成有序批次.

如果要为每个批次应用中间转换,可以使用以下版本:

public static <T, AA, A, B, R> Collector<T, ?, R> unorderedBatches(int batchSize,
        Collector<T, AA, B> batchCollector,
        Collector<B, A, R> downstream) {
    return unorderedBatches(batchSize, 
            Collectors.mapping(list -> list.stream().collect(batchCollector), downstream));
}
Run Code Online (Sandbox Code Playgroud)

例如,通过这种方式,您可以动态地汇总每个批次中的数字:

List<Integer> list = IntStream.range(0,20)
        .boxed().parallel()
        .collect(unorderedBatches(3, Collectors.summingInt(Integer::intValue), 
            Collectors.toList()));
Run Code Online (Sandbox Code Playgroud)


Joh*_*ean 8

如果您想要按顺序使用Stream,则可以对Stream进行分区(以及执行相关的功能,例如窗口化 - 我认为这是您在这种情况下真正需要的功能).两个支持标准流的分段的库是独眼巨人反应(我是作者)和循环反应扩展的jOOλ(添加诸如Windowing之类的功能).

cyclops-streams有一组用于在Java Streams上运行的静态函数StreamUtils,以及一系列函数,如splitAt,headAndTail,splitBy,用于分区的分区.

要将Stream窗口化为大小为30的嵌套流的流,您可以使用窗口方法.

对于OPs点,在Streaming术语中,将Stream拆分为给定大小的多个Streams是一个Windowing操作(而不是分区操作).

  Stream<Streamable<Integer>> streamOfStreams = StreamUtils.window(stream,30);
Run Code Online (Sandbox Code Playgroud)

有一个名为ReactiveSeq的Stream扩展类,它扩展了jool.Seq并添加了Windowing功能,可以使代码更清晰.

  ReactiveSeq<Integer> seq;
  ReactiveSeq<ListX<Integer>> streamOfLists = seq.grouped(30);
Run Code Online (Sandbox Code Playgroud)

正如Tagir指出的那样,这不适合并行Streams.如果您想要以多线程方式窗口或批处理您希望执行的Stream.cyclops -react中的 LazyFutureStream 可能很有用(Windowing在待办事项列表中,但现在可以使用普通的旧批处理).

在这种情况下,数据将从执行Stream的多个线程传递到Multi-Producer/Single-Consumer无等待队列,并且来自该队列的顺序数据可以在再次分发到线程之前被窗口化.

  Stream<List<Data>> batched = new LazyReact().range(0,1000)
                                              .grouped(30)
                                              .map(this::process);
Run Code Online (Sandbox Code Playgroud)


War*_*oth 8

我找到了一个优雅的解决方案: Iterable parts = Iterables.partition(stream::iterator, size)

  • Iterables 来自 Guava:https://github.com/google/guava/blob/master/guava/src/com/google/common/collect/Iterables.java 一些开发人员可能有充分的理由不使用它。您应该提及您使用的第三方库。 (4认同)

Tra*_*001 7

好像Jon Skeet在他的评论中所表明的那样,不可能让分区变得懒散.对于非延迟分区,我已经有了这个代码:

public static <T> Stream<Stream<T>> partition(Stream<T> source, int size) {
    final Iterator<T> it = source.iterator();
    final Iterator<Stream<T>> partIt = Iterators.transform(Iterators.partition(it, size), List::stream);
    final Iterable<Stream<T>> iterable = () -> partIt;

    return StreamSupport.stream(iterable.spliterator(), false);
}
Run Code Online (Sandbox Code Playgroud)

  • 我知道这是一个古老的话题,但它认为值得一提 - 它不是纯粹的Java 8:`Iterators`类来自Guava. (16认同)

小智 5

这是一个纯 Java 解决方案,它是延迟计算的,而不是使用 List。

public static <T> Stream<List<T>> partition(Stream<T> stream, int batchSize){
    List<List<T>> currentBatch = new ArrayList<List<T>>(); //just to make it mutable 
    currentBatch.add(new ArrayList<T>(batchSize));
    return Stream.concat(stream
      .sequential()                   
      .map(new Function<T, List<T>>(){
          public List<T> apply(T t){
              currentBatch.get(0).add(t);
              return currentBatch.get(0).size() == batchSize ? currentBatch.set(0,new ArrayList<>(batchSize)): null;
            }
      }), Stream.generate(()->currentBatch.get(0).isEmpty()?null:currentBatch.get(0))
                .limit(1)
    ).filter(Objects::nonNull);
}
Run Code Online (Sandbox Code Playgroud)

该方法返回Stream<List<T>>灵活性。您可以Stream<Stream<T>>通过 轻松将其转换为partition(something, 10).map(List::stream).