Jer*_*hou 2 java-8 java-stream
据我所知,并行流使用默认值ForkJoinPool.commonPool,默认情况下,线程数比处理器少一个.我想使用自己的自定义线程池.
像这样:
@Test
public void stream() throws Exception {
//System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
ForkJoinPool pool = new ForkJoinPool(10);
List<Integer> testList = Lists.newArrayList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20);
long start = System.currentTimeMillis();
List<Integer> result = pool.submit(() -> testList.parallelStream().map(item -> {
try {
// read from database
Thread.sleep(1000);
System.out.println("task" + item + ":" + Thread.currentThread());
} catch (Exception e) {
}
return item * 10;
})).get().collect(Collectors.toList());
System.out.println(result);
System.out.println(System.currentTimeMillis() - start);
}
Run Code Online (Sandbox Code Playgroud)
我的习惯ForkJoinPool从未使用过.我改变了默认的并行性,如下所示:
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
Run Code Online (Sandbox Code Playgroud)
它运作良好 - 任务仅花费约1秒.
在我的应用程序中,该任务包含大量IO操作(从db读取数据).所以我需要更高的并行性,但我不想更改JVM属性.
那么指定我自己的正确方法是什么ForkJoinPool?
或者如何在IO密集型情况下使用并行流?
我假设您发现了这里描述的技巧:
其中指出
该技巧基于
ForkJoinTask.fork以下指定:“安排在当前任务正在运行的池中异步执行此任务(如果适用),或者使用ForkJoinPool.commonPool()if notinForkJoinPool()”
在您的代码中,parallelStream()和map(...)在 custom 中被调用ForkJoinPool,但Function传递给的map不是。
请记住,这Stream#map是一个中间操作。一旦终端操作被链接,它Function只会对其元素执行。就您而言,该终端操作是collect(...). 由于collect(Collectors.toList()是在线程中调用的main,因此map'sFunction会在 中的每个元素上并行调用commonPool。
您只需将collect(...)呼叫移至您的submit(...).
List<Integer> result = pool.submit(() -> testList.parallelStream().map(item -> {
try {
// read from database
Thread.sleep(1000);
System.out.println("task" + item + ":" + Thread.currentThread());
} catch (Exception e) {
}
return item * 10;
}).collect(Collectors.toList())).get();
Run Code Online (Sandbox Code Playgroud)
溪流很懒; 所有工作都在您开始终端操作时完成.在你的情况下,终端操作是.collect(Collectors.toList()),你在main线程中调用的结果get().因此,实际工作的完成方式与在main线程中构造整个流的方式相同.
要使池生效,必须将终端操作移动到提交的任务中:
ForkJoinPool pool = new ForkJoinPool(10);
List<Integer> testList = Arrays.asList(
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20);
long start = System.currentTimeMillis();
List<Integer> result = pool.submit(() -> testList.parallelStream().map(item -> {
try {
// read from database
Thread.sleep(1000);
System.out.println("task" + item + ":" + Thread.currentThread());
} catch (InterruptedException e) {}
return item * 10;
}).collect(Collectors.toList())).join();
System.out.println(result);
System.out.println(System.currentTimeMillis() - start);
Run Code Online (Sandbox Code Playgroud)
我们还可以通过在main线程中构造流并仅向池中提交终端操作来演示终端操作的相关性:
Stream<Integer> stream = testList.parallelStream().map(item -> {
try {
// read from database
Thread.sleep(1000);
System.out.println("task" + item + ":" + Thread.currentThread());
} catch (InterruptedException e) {}
return item * 10;
});
List<Integer> result = pool.submit(() -> stream.collect(Collectors.toList())).join();
Run Code Online (Sandbox Code Playgroud)
但是你应该记住,这是无证件的行为,这是不能保证的.实际答案必须是当前形式的Stream API,没有线程控制(并且没有帮助处理已检查的异常),不适合并行I/O操作.
| 归档时间: |
|
| 查看次数: |
2871 次 |
| 最近记录: |