带有调用者工作窃取的自定义池的Java parallelStream()?

Jes*_*lle 9 java fork-join java-8 forkjoinpool java-stream

通常当使用Java 8的parallelStream()时,结果是通过默认的公共fork-join池(即ForkJoinPool.commonPool())执行.

然而,如果一个人的工作远离CPU限制,例如可能在很多时候都在等待IO,那么这显然是不可取的.在这种情况下,人们会想要使用一个单独的池,根据其他标准确定大小(例如,任务可能实际使用CPU的时间有多少).

有没有明显得到parallelStream()使用不同池的方式,但作为一个详细的方式在这里.

不幸的是,这种方法需要从fork-join池线程调用并行流上的终端操作.这样做的缺点是,如果目标分支连接池完全忙于现有工作,整个执行将等待它,而什么都不做.因此,池可能成为比单线程执行更糟糕的瓶颈.相反,当以"正常"方式使用parallelStream()时,使用ForkJoinPool.common.externalHelpComplete()或ForkJoinPool.common.tryExternalUnpush()来让池外的调用线程帮助处理.

有谁知道的方式获得parallelStream()使用非默认的fork-join池,并有从的fork-join池帮助外的调用线程在这项工作中的处理(但fork-不休息加入游泳池的工作)?

Hol*_*ger 2

您可以在泳池上使用awaitQuiescence来帮忙。但是,您可以\xe2\x80\x99t 选择您将帮助的任务,它只会从池中获取下一个待处理任务,因此,如果有更多待处理任务,您可能会在到达您的任务之前最终执行这些任务自己的。

\n\n
ForkJoinPool forkJoinPool = new ForkJoinPool(1);\n// make all threads busy:\nforkJoinPool.submit(() -> LockSupport.parkNanos(Long.MAX_VALUE));\n// submit our task (may contain your stream operation)\nForkJoinTask<Thread> task = forkJoinPool.submit(() -> Thread.currentThread());\n// help out\nwhile(!task.isDone()) // use zero timeout to execute one task only\n    forkJoinPool.awaitQuiescence(0, TimeUnit.NANOSECONDS);\nSystem.out.println(Thread.currentThread()==task.get());\n
Run Code Online (Sandbox Code Playgroud)\n\n

将打印true.

\n\n

然而

\n\n
ForkJoinPool forkJoinPool = new ForkJoinPool(1);\n// make all threads busy:\nforkJoinPool.submit(() -> LockSupport.parkNanos(Long.MAX_VALUE));\n// overload:\nforkJoinPool.submit(() -> LockSupport.parkNanos(Long.MAX_VALUE));\n// submit our task (may contain your stream operation)\nForkJoinTask<Thread> task = forkJoinPool.submit(() -> Thread.currentThread());\n// help out\nwhile(!task.isDone())\n    forkJoinPool.awaitQuiescence(0, TimeUnit.NANOSECONDS);\nSystem.out.println(Thread.currentThread()==task.get());\n
Run Code Online (Sandbox Code Playgroud)\n\n

当它尝试执行第二个阻塞任务时将永远挂起。

\n\n

尽管如此,它会让启动线程帮助处理 pool\xe2\x80\x99s 待处理任务,只要没有无限任务,这将提高其自己的任务被执行的机会(上面的例子是极端的,仅用于演示)。

\n\n
\n\n

但请注意,Fork/Join 框架和 API 之间的整个关系Stream无论如何都是一个实现细节。

\n