生成无限并行流

Azb*_*iak 1 java java-8 rx-java java-stream

问题

嗨,我有一个函数,我将返回无限的并行流(是的,在这种情况下它要快得多)生成的结果。所以很明显(或不)我用过

Stream<Something> stream = Stream.generate(this::myGenerator).parallel()
Run Code Online (Sandbox Code Playgroud)

它有效,但是......当我想限制结果时它不会(当流是顺序的时一切都很好)。我的意思是,当我做类似的事情时,它会产生结果

stream.peek(System.out::println).limit(2).collect(Collectors.toList())
Run Code Online (Sandbox Code Playgroud)

但即使peek输出产生超过 10 个元素,collect仍然没有最终确定(生成很慢,所以这 10个元素甚至可能需要一分钟)......这是一个简单的例子。实际上,限制这些结果是未来,因为主要期望是在用户终止进程之前只获得比最近的结果更好的结果(其他情况是首先返回我可以通过抛出异常findFirst所做的事情,如果没有其他帮助[没有,即使我在控制台上有更多元素并且在大约 30 秒内没有更多结果])。

所以,问题是...

如何复制?我的想法也是使用 RxJava,还有另一个问题 - 如何使用该工具(或其他工具)实现类似的结果。

代码示例

public Stream<Solution> generateSolutions() {
     final Solution initialSolution = initialSolutionMaker.findSolution();
     return Stream.concat(
          Stream.of(initialSolution),
          Stream.generate(continuousSolutionMaker::findSolution)
    ).parallel();
}

new Solver(instance).generateSolutions()
    .map(Solution::getPurpose)
    .peek(System.out::println)
    .limit(5).collect(Collectors.toList());
Run Code Online (Sandbox Code Playgroud)

实施findSolution并不重要。它有一些副作用,比如添加到解决方案仓库(singleton、sych 等),但仅此而已。

Hol*_*ger 5

已链接的答案中所述,高效并行流的关键点是使用已经具有固有大小的流源,而不是使用无大小甚至无限的流并对其应用 a limit。注入大小根本不适用于当前实现,同时确保已知大小不会丢失要容易得多。即使无法保留确切尺寸,例如在应用 时filter,尺寸仍会作为估计尺寸进行携带。

所以代替

Stream.generate(this::myGenerator).parallel()
      .peek(System.out::println)
      .limit(2)
      .collect(Collectors.toList())
Run Code Online (Sandbox Code Playgroud)

只是使用

IntStream.range(0, /* limit */ 2).unordered().parallel()
         .mapToObj(unused -> this.myGenerator())
         .peek(System.out::println)
         .collect(Collectors.toList())
Run Code Online (Sandbox Code Playgroud)

或者,更接近您的示例代码

public Stream<Solution> generateSolutions(int limit) {
    final Solution initialSolution = initialSolutionMaker.findSolution();
    return Stream.concat(
         Stream.of(initialSolution),
         IntStream.range(1, limit).unordered().parallel()
               .mapToObj(unused -> continuousSolutionMaker.findSolution())
   );
}

new Solver(instance).generateSolutions(5)
    .map(Solution::getPurpose)
    .peek(System.out::println)
    .collect(Collectors.toList());
Run Code Online (Sandbox Code Playgroud)