从并行流中收集结果

Fan*_*Fan 3 java parallel-processing java-8 java-stream

我有一段这样的代码:

List<Egg> eggs = hens.parallelStream().map(hen -> {
    ArrayList<Egg> eggs = new ArrayList<>();
    while (hen.hasEgg()) {
        eggs.add(hen.getEgg());
    }
    return eggs;
}).flatMap(Collection::stream).collect(Collectors.toList());
Run Code Online (Sandbox Code Playgroud)

但是通过这种方式,我必须为每只母鸡创建一个ArrayList,并且在母鸡100%处理之前不会收集鸡蛋.我想要这样的东西:

List<Egg> eggs = hens.parallelStream().map(hen -> {
    while (hen.hasEgg()) {
        yield return hen.getEgg();
    }
}).collect(Collectors.toList());
Run Code Online (Sandbox Code Playgroud)

但Java没有收益率.有没有办法实现它?

Tag*_*eev 8

您的Hen课程很难适应Stream API.如果您无法更改它并且没有其他有用的方法(例如Collection<Egg> getAllEggs()Iterator<Egg> eggIterator()),您可以创建如下的蛋流:

public static Stream<Egg> eggs(Hen hen) {
    Iterator<Egg> it = new Iterator<Egg>() {
        @Override
        public boolean hasNext() {
            return hen.hasEgg();
        }

        @Override
        public Egg next() {
            return hen.getEgg();
        }
    };
    return StreamSupport.stream(Spliterators.spliteratorUnknownSize(it, 0), false);
}
Run Code Online (Sandbox Code Playgroud)

现在您可以通过以下方式使用它:

List<Egg> eggs = hens.parallelStream()
                     .flatMap(hen -> eggs(hen))
                     .collect(Collectors.toList());
Run Code Online (Sandbox Code Playgroud)

当然,Stream如果您可以更改课程,则可以实现更好的实施Hen.

  • @ user2316040类似迭代器结构的常见问题 - 在本例中为hasEgg/getEgg - 是检查逻辑和获取逻辑重叠,因此它们可能必须以一种不舒服的方式共享状态.您可以考虑通过Spliterators.AbstractSpliterator创建流.这要求您只需覆盖tryAdvance(),它将检查和返回操作融合到一个方法中. (2认同)
  • 具有讽刺意味的是,Spliterator以这种方式创建了尝试提供并行支持,猜测是什么,在分裂时将值缓冲到数组中,因此所有尝试提供延迟提取的工作都会消失. (2认同)