Java Streams - 有效地对排序流上的项目进行分组

Eya*_*der 8 java java-8 java-stream

我正在寻找一种方法来实现非终端分组操作,这样内存开销将是最小的.

例如,考虑distinct().在一般情况下,它别无选择,只能收集所有不同的项目,然后才向前流式传输.但是,如果我们知道输入流已经排序,则可以使用最少的内存"在运行中"完成操作.

我知道我可以使用迭代器包装器并自己实现分组逻辑来实现迭代器.是否有更简单的方法来使用流API来实现它?

- 编辑 -

我发现了一种滥用Stream.flatMap(..)来实现此目的的方法:

  private static class DedupSeq implements IntFunction<IntStream> {
    private Integer prev;

    @Override
    public IntStream apply(int value) {
      IntStream res = (prev != null && value == prev)? IntStream.empty() : IntStream.of(value);
      prev = value;
      return res;
    }    
  }
Run Code Online (Sandbox Code Playgroud)

然后:

IntStream.of(1,1,3,3,3,4,4,5).flatMap(new DedupSeq()).forEach(System.out::println);
Run Code Online (Sandbox Code Playgroud)

哪个印刷品:

1
3
4
5
Run Code Online (Sandbox Code Playgroud)

通过一些更改,相同的技术可用于任何类型的存储器有效的流序列分组.无论如何,我不太喜欢这个解决方案,而且我正在寻找更自然的东西(比如映射或过滤工作的方式).此外,我在这里违约,因为提供给flatMap(..)的函数是有状态的.

Hol*_*ger 4

如果您想要一个不将可变状态添加到不应该具有可变状态的函数的解决方案,您可以求助于collect

\n\n
static void distinctForSorted(IntStream s, IntConsumer action) {\n    s.collect(()->new long[]{Long.MIN_VALUE},\n              (a, i)->{ if(a[0]!=i) { action.accept(i); assert i>a[0]; a[0]=i; }},\n              (a, b)->{ throw new UnsupportedOperationException(); });\n}\n
Run Code Online (Sandbox Code Playgroud)\n\n

这是因为它是使用可变容器的预期方式,但是,它不能并行工作,因为在任意流位置进行拆分意味着可能会在两个(甚至更多)线程中遇到一个值。

\n\n

如果您想要通用目的IntStream而不是forEach操作,Spliterator则首选低级解决方案,尽管会增加复杂性。

\n\n
static IntStream distinctForSorted(IntStream s) {\n    Spliterator.OfInt sp=s.spliterator();\n    return StreamSupport.intStream(\n      new Spliterators.AbstractIntSpliterator(sp.estimateSize(),\n      Spliterator.DISTINCT|Spliterator.SORTED|Spliterator.NONNULL|Spliterator.ORDERED) {\n        long last=Long.MIN_VALUE;\n        @Override\n        public boolean tryAdvance(IntConsumer action) {\n            long prev=last;\n            do if(!sp.tryAdvance(distinct(action))) return false; while(prev==last);\n            return true;\n        }\n        @Override\n        public void forEachRemaining(IntConsumer action) {\n            sp.forEachRemaining(distinct(action));\n        }\n        @Override\n        public Comparator<? super Integer> getComparator() {\n            return null;\n        }\n        private IntConsumer distinct(IntConsumer c) {\n            return i-> {\n                if(i==last) return;\n                assert i>last;\n                last=i;\n                c.accept(i);\n            };\n        }\n    }, false);\n}\n
Run Code Online (Sandbox Code Playgroud)\n\n

它甚至继承了并行支持,尽管它的工作原理是在另一个线程中处理某些值之前预取一些值,因此它不会加速不同的操作,但如果存在计算密集型操作,则可能会加速后续操作。

\n\n
\n\n

为了完成,这里有一个针对任意(即未排序)的不同操作,IntStream它不依赖于 \xe2\x80\x99t 依赖于 \xe2\x80\x9cboxing 加上HashMap\xe2\x80\x9d 因此可能具有更好的内存占用:

\n\n
static IntStream distinct(IntStream s) {\n    boolean parallel=s.isParallel();\n    s=s.collect(BitSet::new, BitSet::set, BitSet::or).stream();\n    if(parallel) s=s.parallel();\n    return s;\n}\n
Run Code Online (Sandbox Code Playgroud)\n\n

它仅适用于正值int;将其扩展到完整的 32\xc2\xa0bit 范围将需要两个BitSets,因此看起来不那么简洁,但用例通常允许将存储限制在 31\xc2\xa0bit 范围甚至更低\xe2\x80\xa6

\n