Java不使用所有可用的CPU

BKE*_*BKE 13 java concurrency multithreading fork-join java-8

我有一个长期运行的计算,我需要执行一长串输入.计算是独立的,所以我想将它们分配给几个CPU.我使用的是Java 8.

代码的框架如下所示:

ExecutorService executorService = Executors.newFixedThreadPool(numThreads);

MyService myService = new MyService(executorService);

List<MyResult> results =
            myInputList.stream()
                     .map(myService::getResultFuture)
                     .map(CompletableFuture::join)
                     .collect(Collectors.toList());

executorService.shutdown();
Run Code Online (Sandbox Code Playgroud)

负责计算的主要功能如下:

CompletableFuture<MyResult> getResultFuture(MyInput input) {
    return CompletableFuture.supplyAsync(() -> longCalc(input), executor)))
}
Run Code Online (Sandbox Code Playgroud)

长时间运行的计算是无状态的,并且不执行任何IO.

我希望这段代码能够使用所有可用的CPU,但它不会发生.例如,在具有72个CPU和numThreads=72(或甚至例如numThreads=500)的机器上,CPU使用率最多为500-1000%,如htop所示:

HTOP

根据线程转储,许多计算线程都在等待,即:

"pool-1-thread-34" #55 prio=5 os_prio=0 tid=0x00007fe858597890 nid=0xd66 waiting on condition [0x00007fe7f9cdd000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x0000000381815f20> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
    at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
    - None
Run Code Online (Sandbox Code Playgroud)

所有计算线程都在等待同一个锁.在转储时,只有5个计算线程是RUNNABLE,其余的是WAITING.

锁定的原因是什么,为什么我没有设法使用所有cpu?

Hol*_*ger 14

您正在提交作业并在join()之后调用,等待异步作业完成.

流的中间步骤执行元件的角度来看,这意味着中间步骤.map(CompletableFuture::join)在时间(甚至更糟,因为它是一个连续的流)一个元素上运行,没有确保所有元素都通过提交一步了.这会导致线程在等待每次单个计算完成时阻塞.

在开始调用之前,您必须强制提交所有作业join():

List<MyResult> results =
    myInputList.stream()
               .map(myService::getResultFuture)
               .collect(Collectors.toList()).stream()
               .map(CompletableFuture::join)
               .collect(Collectors.toList());
Run Code Online (Sandbox Code Playgroud)

如果您可以将results列表中的任何内容表达为要在完成所有操作时调用的操作,则可以通过不阻止线程的方式实现操作join():

List<CompletableFuture<MyResult>> futures = myInputList.stream()
    .map(myService::getResultFuture)
    .collect(Collectors.toList());
CompletableFuture.allOf(futures.toArray(CompletableFuture<?>[]::new))
    .thenRun(() -> {
        List<MyResult> results = futures.stream()
            .map(CompletableFuture::join)
            .collect(Collectors.toList());
        // perform action with results
    });
Run Code Online (Sandbox Code Playgroud)

它仍然会调用join()检索结果,但此时所有期货都已完成,因此调用者不会被阻止.

  • @ErnestKiwele它确实提交给执行者服务,但正如Holger所解释的那样,它会立即等待完成提交的单个任务.这与在单个线程上运行相同.Holger的修复程序首先将所有任务提交给执行程序服务,然后在提交所有任务时,等待所有返回的期货. (3认同)