如何为Java 8并行流指定ForkJoinPool?

Jer*_*hou 2 java-8 java-stream

据我所知,并行流使用默认值ForkJoinPool.commonPool,默认情况下,线程数比处理器少一个.我想使用自己的自定义线程池.

像这样:

@Test
public void stream() throws Exception {
    //System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
    ForkJoinPool pool = new ForkJoinPool(10);
    List<Integer> testList = Lists.newArrayList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20);
    long start = System.currentTimeMillis();
    List<Integer> result = pool.submit(() -> testList.parallelStream().map(item -> {
        try {
            // read from database
            Thread.sleep(1000);
            System.out.println("task" + item + ":" + Thread.currentThread());
        } catch (Exception e) {
        }
        return item * 10;
    })).get().collect(Collectors.toList());
    System.out.println(result);
    System.out.println(System.currentTimeMillis() - start);
}
Run Code Online (Sandbox Code Playgroud)

结果如下: 在此输入图像描述

我的习惯ForkJoinPool从未使用过.我改变了默认的并行性,如下所示:

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
Run Code Online (Sandbox Code Playgroud)

它运作良好 - 任务仅花费约1秒.

在我的应用程序中,该任务包含大量IO操作(从db读取数据).所以我需要更高的并行性,但我不想更改JVM属性.

那么指定我自己的正确方法是什么ForkJoinPool

或者如何在IO密集型情况下使用并行流?

Sav*_*ior 6

我假设您发现了这里描述的技巧:

其中指出

该技巧基于ForkJoinTask.fork以下指定:“安排在当前任务正在运行的池中异步执行此任务(如果适用),或者使用ForkJoinPool.commonPool()if not inForkJoinPool()

在您的代码中,parallelStream()map(...)在 custom 中被调用ForkJoinPool,但Function传递给的map不是。

请记住,这Stream#map是一个中间操作。一旦终端操作被链接,它Function只会对其元素执行。就您而言,该终端操作是collect(...). 由于collect(Collectors.toList()是在线程中调用的main,因此map'sFunction会在 中的每个元素上并行调用commonPool

您只需将collect(...)呼叫移至您的submit(...).

List<Integer> result = pool.submit(() -> testList.parallelStream().map(item -> {
    try {
        // read from database
        Thread.sleep(1000);
        System.out.println("task" + item + ":" + Thread.currentThread());
    } catch (Exception e) {
    }
    return item * 10;
}).collect(Collectors.toList())).get();
Run Code Online (Sandbox Code Playgroud)


Hol*_*ger 6

溪流很懒; 所有工作都在您开始终端操作时完成.在你的情况下,终端操作是.collect(Collectors.toList()),你在main线程中调用的结果get().因此,实际工作的完成方式与在main线程中构造整个流的方式相同.

要使池生效,必须将终端操作移动到提交的任务中:

ForkJoinPool pool = new ForkJoinPool(10);
List<Integer> testList = Arrays.asList(
    1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20);
long start = System.currentTimeMillis();
List<Integer> result = pool.submit(() -> testList.parallelStream().map(item -> {
    try {
        // read from database
        Thread.sleep(1000);
        System.out.println("task" + item + ":" + Thread.currentThread());
    } catch (InterruptedException e) {}
    return item * 10;
}).collect(Collectors.toList())).join();
System.out.println(result);
System.out.println(System.currentTimeMillis() - start);
Run Code Online (Sandbox Code Playgroud)

我们还可以通过在main线程中构造流并仅向池中提交终端操作来演示终端操作的相关性:

Stream<Integer> stream = testList.parallelStream().map(item -> {
    try {
        // read from database
        Thread.sleep(1000);
        System.out.println("task" + item + ":" + Thread.currentThread());
    } catch (InterruptedException e) {}
    return item * 10;
});
List<Integer> result = pool.submit(() -> stream.collect(Collectors.toList())).join();
Run Code Online (Sandbox Code Playgroud)

但是你应该记住,这是无证件的行为,这是不能保证的.实际答案必须是当前形式的Stream API,没有线程控制(并且没有帮助处理已检查的异常),不适合并行I/O操作.