在并行流上调用顺序会使所有先前的操作顺序进行

the*_*gin 13 java multithreading forkjoinpool java-stream

我有一个重要的数据集,并希望调用缓慢但干净的方法,而不是调用快速方法,副作用对第一个结果.我对中间结果不感兴趣,所以我不想收集它们.

显而易见的解决方案是创建并行流,进行慢速呼叫,再次使流顺序,并进行快速呼叫.问题是,所有代码都在单线程中执行,没有实际的并行性.

示例代码:

@Test
public void testParallelStream() throws ExecutionException, InterruptedException
{
    ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors() * 2);
    Set<String> threads = forkJoinPool.submit(()-> new Random().ints(100).boxed()
            .parallel()
            .map(this::slowOperation)
            .sequential()
            .map(Function.identity())//some fast operation, but must be in single thread
            .collect(Collectors.toSet())
    ).get();
    System.out.println(threads);
    Assert.assertEquals(Runtime.getRuntime().availableProcessors() * 2, threads.size());
}

private String slowOperation(int value)
{
    try
    {
        Thread.sleep(100);
    }
    catch (InterruptedException e)
    {
        e.printStackTrace();
    }
    return Thread.currentThread().getName();
}
Run Code Online (Sandbox Code Playgroud)

如果我删除sequential,代码按预期执行,但显然,非并行操作将在多个线程中调用.

你能推荐一些关于这种行为的引用,或者某些方法可以避免临时收集吗?

Tag*_*eev 16

在最初的Stream API设计中将流切换parallel()sequential()工作,但是引起了许多问题,最后实现被更改,因此它只是为整个管道打开和关闭并行标志.当前的文档确实含糊不清,但在Java-9中有所改进:

根据调用终端操作的流的模式,顺序或并行地执行流管道.可以使用该BaseStream.isParallel()方法确定流的顺序或并行模式,并且可以使用BaseStream.sequential()BaseStream.parallel()操作来修改流的模式.最近的顺序或并行模式设置适用于整个流管道的执行.

至于您的问题,您可以将所有内容收集到中间List并启动新的顺序管道:

new Random().ints(100).boxed()
        .parallel()
        .map(this::slowOperation)
        .collect(Collectors.toList())
        // Start new stream here
        .stream()
        .map(Function.identity())//some fast operation, but must be in single thread
        .collect(Collectors.toSet());
Run Code Online (Sandbox Code Playgroud)

  • 您引用的句子在Java 8版本中完全相同,可以在同一个地方找到,也就是类文档的最后一段.通常,您可以在[包文档](https://docs.oracle.com/javase/8/docs/api/java/util/stream/package-summary.html#StreamOps)中找到更多信息(请参阅"并行性") )而不是[特定方法](https://docs.oracle.com/javase/8/docs/api/java/util/stream/BaseStream.html#parallel--),而不仅仅是并行/顺序模式(例如,与减少相比较). (2认同)