我有一个递归的分治算法,在开始分割之前需要两个计算密集的基本案例任务.最初的基本案例是独立的任务,所以我想并行完成.在基本情况之后,除法运行相同的任务,在0和1之间输入不同的输入,并根据输出决定是否再次拆分.我通过创建一个伪造递归的任务包装器对象来使基本案例工作,但这感觉就像一个kludge,如下所示:
public static void doSomething () {
ForkJoinPool pool = new ForkJoinPool();
private ArrayList<Object> al = new ArrayList<Object>();
TaskWrapper tw = new TaskWrapper(true,-1);
al.addAll(pool.invoke(tw));
}
@SuppressWarnings("serial")
public static class TaskWrapper extends RecursiveTask<ArrayList<Object>> {
private ArrayList<Object> al = new ArrayList<Object>();
private boolean arg;
private double input;
private Object out;
TaskWrapper(boolean ar, double in){
arg = ar;
input = in;
}
@Override
public ArrayList<Object> compute() {
if (arg == false) {
out = new Object(runIntensiveTask(input));
al.add(out);
}
else {
// Right Base …Run Code Online (Sandbox Code Playgroud) java multithreading java.util.concurrent fork-join forkjoinpool
我正在使用CompletableFuture来异步执行从列表源生成的流.
所以我正在测试重载方法,即CompletableFuture的"supplyAsync",其中一个方法只接受单个供应商参数,另一个方法接受供应商参数和执行者参数.以下是两者的文档:
一
supplyAsync(供应商供应商)
返回由ForkJoinPool.commonPool()中运行的任务异步完成的新CompletableFuture,其中包含通过调用给定供应商获得的值.
第二
supplyAsync(供应商供应商,执行执行人)
返回由给定执行程序中运行的任务异步完成的新CompletableFuture,其中包含通过调用给定供应商获得的值.
这是我的测试类:
public class TestCompleteableAndParallelStream {
public static void main(String[] args) {
List<MyTask> tasks = IntStream.range(0, 10)
.mapToObj(i -> new MyTask(1))
.collect(Collectors.toList());
useCompletableFuture(tasks);
useCompletableFutureWithExecutor(tasks);
}
public static void useCompletableFutureWithExecutor(List<MyTask> tasks) {
long start = System.nanoTime();
ExecutorService executor = Executors.newFixedThreadPool(Math.min(tasks.size(), 10));
List<CompletableFuture<Integer>> futures =
tasks.stream()
.map(t -> CompletableFuture.supplyAsync(() -> t.calculate(), executor))
.collect(Collectors.toList());
List<Integer> result =
futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration); …Run Code Online (Sandbox Code Playgroud) executorservice java-8 threadpoolexecutor forkjoinpool completable-future
在我的一个用例中,我需要从多个节点获取数据.每个节点都维护一个数据范围(分区).目标是尽可能快地读取数据.约束是,分区的基数在手头之前是未知的.使用工作共享方法,我可以将分区拆分为子分区并并行获取数据.这种方法的一个缺点是,一个线程有可能获取大量数据并花费更多时间,而另一个线程可以更快地完成.另一种方法是使用工作窃取,我们可以将分区分成更小的范围并使用ForkJoinPool.这种方法的缺点是,如果分区稀疏,我们可以多次往服务器往返,以实现子分区没有数据.
我的问题是,如果我想使用ForkJoinPool,任务可以执行一些I/O操作,我该怎么做?从FJ池的文档和我到目前为止阅读的最佳实践来看,似乎FJ池不利于阻止IO操作.如果我想使用非阻塞IO,我该怎么做?
在 Java 8 中,可以设置一个自定义的 forkJoinPool 以供并行流使用,而不是公共池。
forkJoinPool.submit(() -> list.parallelStream().forEach(x ->{...} ))
Run Code Online (Sandbox Code Playgroud)
我的问题是它在技术上是如何发生的?
流无论如何都不知道它已提交到自定义 forkJoinpool,并且无法直接访问它。那么最终如何使用正确的线程来处理流的任务呢?
我尝试查看源代码但无济于事。我最好的猜测是在提交时的某个时刻设置了一些 threadLocal 变量,然后由流稍后使用。如果是这样,为什么语言开发人员会选择这种方式来实现行为,而不是将池依赖注入到流中?
谢谢!
设置这个 Leaf Target 的想法是让每个线程有 4 个叶子;因此,如果一个线程提前结束,它将有大量的机会窃取工作。
但是 getCommonPoolParallelism() 不再返回 #cores.. 它而是返回 #cores - 1。那么为什么这是当前的 LEAF_TARGET?
我正在使用 ForkJoinPool 调查应用程序中的一些性能问题。我们一直在使用 Dynatrace,有迹象表明一些阻塞操作会持续太长时间。我在 FJP 文档或其他地方找不到足够的关于如何配置和监控我们的 ForkJoinPools 的信息。
在 ForkJoinPools 的上下文中,并行性是什么意思,以及为不同线程池(阻塞/非阻塞)选择哪些值的指南/最佳实践是什么?
如何监控和调整我的 ForkJoinPool?我们正在使用 ForkJoinPool.toString() ,它提供了一些计数器,但我在 javadoc 中找不到关于如何使用此统计信息进行调整的足够信息。getStealCount()被描述为“......应该足够高以保持线程忙碌,但足够低以避免跨线程的开销和争用”,这并没有真正的帮助。
toString() 示例
[Running, parallelism = 48, size = 47, active = 0, running = 0, steals
= 33195, tasks = 0, submissions = 0]
Run Code Online (Sandbox Code Playgroud) 参考Java 的 Fork/Join vs ExecutorService - 何时使用哪个?,一个传统的线程池通常用于处理很多独立的请求;和 aForkJoinPool用于处理连贯/递归任务,其中一个任务可能会产生另一个子任务并稍后加入。
那么,为什么默认parallelStream使用Java-8ForkJoinPool而不是传统的执行器呢?
在很多情况下,我们forEach()在stream()orparallelStream()之后使用,然后提交一个功能接口作为参数。在我看来,这些任务是独立的,不是吗?
parallel-processing concurrency threadpool forkjoinpool java-stream
我正在尝试对一些大数据实施分而治之的解决方案。我使用 fork 和 join 将事物分解为线程。但是我有一个关于分叉机制的问题:如果我将分而治之的条件设置为:
@Override
protected SomeClass compute(){
if (list.size()<LIMIT){
//Do something here
...
}else{
//Divide the list and invoke sub-threads
SomeRecursiveTaskClass subWorker1 = new SomeRecursiveTaskClass(list.subList());
SomeRecursiveTaskClass subWorker2 = new SomeRecursiveTaskClass(list.subList());
invokeAll(subWorker1, subWorker2);
...
}
}
Run Code Online (Sandbox Code Playgroud)
如果没有足够的资源可供调用subWorker(例如池中没有足够的线程),会发生什么情况?Fork/Join 框架是否维护可用线程的池大小?或者我应该将这个条件添加到我的分治逻辑中?
使用 的最佳实践是什么.stream().parallel()?
例如,如果您有一堆阻塞 I/O 调用并且您想检查 if .anyMatch(...),那么并行执行此操作似乎是明智之举。
示例代码:
public boolean hasAnyRecentReference(JobId jobid) {
<...>
return pendingJobReferences.stream()
.parallel()
.anyMatch(pendingRef -> {
JobReference readReference = pendingRef.sync();
Duration referenceAge = timeService.timeSince(readReference.creationTime());
return referenceAge.lessThan(maxReferenceAge)
});
}
Run Code Online (Sandbox Code Playgroud)
乍一看这看起来很合理,因为我们可以同时执行多个阻塞读取,因为我们只关心匹配的任何一个,而不是一个接一个地检查(所以如果每次读取需要 50 毫秒,我们只需要等待 ( 50ms * expectedNumberOfNonRecentRefs ) / numThreads)。
在生产环境中引入此代码是否会对代码库的其他部分产生任何不可预见的性能影响?
java parallel-processing concurrency forkjoinpool java-stream
不传递Executorto CompletableFuture.runAsync(),ForkJoinPool则使用公共。相反,对于我想要异步执行的简单任务(例如,我不需要链接不同的任务),我可能只使用ForkJoinPool.commonPool().execute().
为什么应该优先考虑另一个?例如,是否runAsync()有任何实质性的开销execute()?前者比后者有什么特别的优势吗?
java multithreading asynchronous forkjoinpool completable-future
forkjoinpool ×10
java ×8
java-stream ×4
fork-join ×3
java-8 ×3
concurrency ×2
asynchronous ×1
threadpool ×1