最大化与Java 8 Streams的并行性

wvd*_*vdz 1 java java-stream

Java Streams基于硬件上的并行数量.但是如果我想要始终拥有最大的并行度呢?

请考虑以下代码.我希望10个任务中的每个任务同时运行100毫秒.

long runUntil = System.currentTimeMillis() + 100;
IntStream.range(0, 10).parallel().forEach(i ->
{
    int cnt = 0;
    while(System.currentTimeMillis() < runUntil)
        cnt++;
    System.out.println(i + ": " + cnt);
});
Run Code Online (Sandbox Code Playgroud)

但是,我得到的结果是:

2: 56443
1: 67506
4: 74693
6: 70549
0: 0
3: 0
5: 0
7: 0
8: 0
9: 0
Run Code Online (Sandbox Code Playgroud)

因此,只有4个任务并行执行,第五个任务仅在前4个任务中的一个完成时开始.我希望所有任务大约在同一时间开始,而不是等待彼此.

我不同意它是Java 8并行流中自定义线程池的副本,因为这个问题是关于阻止其他任务的慢速运行任务,而在我的情况下,我只是想知道如何(如果可以)最大化使用Stream API时的并行性.

Eug*_*ene 5

当您执行并行流时,您可以调用ForkJoinPool,该池具有的工作线程数等于以下结果:

 Runtime.getRuntime().availableProcessors(); // 4 in your case
Run Code Online (Sandbox Code Playgroud)

所以并行任务由4个线程同时执行.

当你开始第5个任务(100毫秒已经过去)时,所以这个条件:

  while(System.currentTimeMillis() < runUntil)
Run Code Online (Sandbox Code Playgroud)

报告错误,因此仅为零.

要解决此问题,您可以自己创建一个ForkJoinPool,如本答案中所述(/sf/answers/1558884491/)

long runUntil = System.currentTimeMillis() + 1000;
ForkJoinPool forkJoinPool = new ForkJoinPool(10); // 10 Threads
forkJoinPool.submit(() ->
IntStream.range(0, 10).parallel().forEach(i -> {
    int cnt = 0;
    while (System.currentTimeMillis() < runUntil)
        cnt++;
    System.out.println(i + ": " + cnt);
})).get();
Run Code Online (Sandbox Code Playgroud)

  • @wvdz在生产环境中,您需要手动触发线程,以便人们知道发生了什么.你可以`mapToObject`并在那里创建线程. (2认同)