标签: forkjoinpool

Java-5 ThreadPoolExecutor相对于Java-7 ForkJoinPool有什么优势?

Java 5引入了Executor框架形式的线程池对异步任务执行的支持,其核心是java.util.concurrent.ThreadPoolExecutor实现的线程池.Java 7以java.util.concurrent.ForkJoinPool的形式添加了一个备用线程池.

查看各自的API,ForkJoinPool在标准场景中提供了ThreadPoolExecutor功能的超集(虽然严格来说ThreadPoolExecutor提供了比ForkJoinPool更多的调优机会).除此之外,fork/join任务看起来更快(可能是因为工作窃取调度程序)的观察结果显然需要更少的线程(由于非阻塞连接操作),可能会让人觉得ThreadPoolExecutor已被取代ForkJoinPool.

但这真的是对的吗?我读过的所有材料似乎总结为两种类型的线程池之间相当模糊的区别:

  • ForkJoinPool适用于许多依赖的,任务生成的,简短的,几乎不会阻塞(即计算密集型)的任务
  • ThreadPoolExecutor用于少量,独立,外部生成,长期,有时阻塞的任务

这种区别是否正确?我们能说出更具体的内容吗?

java parallel-processing threadpool threadpoolexecutor forkjoinpool

34
推荐指数
2
解决办法
5417
查看次数

Java ForkJoinPool具有非递归任务,是否可以正常工作?

我想Runnable通过一种方法将任务提交到ForkJoinPool:

forkJoinPool.submit(Runnable task)
Run Code Online (Sandbox Code Playgroud)

注意,我使用的是JDK 7.

在引擎盖下,它们被转换为ForkJoinTask对象.我知道当一个任务以递归方式分成较小的任务时,ForkJoinPool是有效的.

题:

如果没有递归,窃取工作仍然可以在ForkJoinPool中工作吗?

在这种情况下值得吗?

更新1: 任务很小,可能不平衡.即使对于严格相同的任务,诸如上下文切换,线程调度,停车,页面未命中等事情也会妨碍导致不平衡.

更新2: Doug Lea在Concurrency JSR-166兴趣小组中写道,给出了一个暗示:

当所有任务都是异步并提交到池而不是分叉时,这也极大地提高了吞吐量,这成为构造actor框架的合理方法,以及许多您可能使用ThreadPoolExecutor的普通服务.

我认为,当涉及到相当小的CPU绑定任务时,由于这种优化,ForkJoinPool是可行的方法.重点是这些任务已经很小,不需要递归分解.工作窃取工作,无论是大型还是小型任务 - 任务都可以被来自忙碌工人的Deque尾巴的另一个自由工作者抓住.

更新3: ForkJoinPool的可扩展性 - Akka乒乓球队的基准测试显示了很好的结果.

尽管如此,要更有效地应用ForkJoinPool需要进行性能调整.

java multithreading fork-join work-stealing forkjoinpool

25
推荐指数
1
解决办法
2512
查看次数

官方文档在哪里说Java的并行流操作使用fork/join?

以下是我对Java 8 的Stream框架的理解:

  1. 东西产生了源
  2. 该实现负责提供BaseStream#parallel()方法,该方法依次返回可以并行运行其操作的Stream.

虽然有人已经找到了一种方法来使用自定义线程池和Stream框架的并行执行,但我不能在Java 8 API中找到任何提及默认Java 8并行Stream实现将使用ForkJoinPool#commonPool()的内容.(Collection#parallelStream(),StreamSupport类中的方法,以及API中我不知道的其他可能的并行启用流源).

我只能搜索搜索结果的花絮是:


所以我的问题是:

在哪里说ForkJoinPool#commonPool()用于对从Java 8 API获得的流进行并行操作?

java fork-join java-8 forkjoinpool java-stream

24
推荐指数
2
解决办法
6295
查看次数

Java支持三种不同的并发模型

我在多线程环境中经历了不同的并发模型(http://tutorials.jenkov.com/java-concurrency/concurrency-models.html)

本文重点介绍了三种并发模型.

  1. 并行工人

    第一个并发模型就是我所说的并行工作模型.传入的工作分配给不同的工作人员.

  2. 流水线

    工人们正在举办类似的工人在工厂的流水线.每个工人只执行完整工作的一部分.当该部分完成时,工人将工作转发给下一个工人.

    每个工作者都在自己的线程中运行,并且不与其他工作者共享任何状态.这有时也称为无共享并发模型.

  3. 功能并行

    函数并行的基本思想是使用函数调用实现程序.函数可以被视为彼此发送消息的" 代理 "或" 参与者 ",就像在流水线并发模型(AKA反应或事件驱动系统)中一样.当一个函数调用另一个函数时,这类似于发送消息.

现在我想为这三个概念映射java API支持

  1. 并行工作者:它是ExecutorService,ThreadPoolExecutor,CountDownLatch API吗?

  2. 装配线:将事件发送到JMS等消息传递系统并使用队列和主题的消息传递概念.

  3. 功能并行:ForkJoinPool在某种程度上和java 8流.与溪流相比,ForkJoin池很容易理解.

我是否正确映射这些并发模型?如果没有,请纠正我.

java multithreading executorservice countdownlatch forkjoinpool

24
推荐指数
2
解决办法
2492
查看次数

CompletableFuture的完成处理程序在哪个线程中执行?

我有一个关于CompletableFuture方法的问题:

public <U> CompletableFuture<U> thenApply(Function<? super T, ? extends U> fn)
Run Code Online (Sandbox Code Playgroud)

事情是JavaDoc说的就是这样:

返回一个新的CompletionStage,当该阶段正常完成时,将使用此阶段的结果作为所提供函数的参数执行.有关特殊完成的规则​​,请参阅CompletionStage文档.

线程怎么样?这将在哪个线程中执行?如果未来由线程池完成怎么办?

java multithreading forkjoinpool completable-future

21
推荐指数
4
解决办法
3929
查看次数

Java8 ForkJoinPool和Executors.newWorkStealingPool之间的详细区别?

使用中的低级差异是什么:

ForkJoinPool = new ForkJoinPool(X);
Run Code Online (Sandbox Code Playgroud)

ExecutorService ex = Executors.neWorkStealingPool(X);
Run Code Online (Sandbox Code Playgroud)

其中X是所需的并行级别,即线程运行..

根据文档我发现它们相似.还告诉我哪一个在任何正常用途下更合适和安全.我有1.3亿个条目写入BufferedWriter并使用Unix排序按第1列排序.

另请告诉我如果可能的话要保留多少个线程.

注意:我的系统有 8个核心处理器和 32 GB RAM.

multithreading executorservice fork-join executors forkjoinpool

15
推荐指数
2
解决办法
5936
查看次数

在并行流上调用顺序会使所有先前的操作顺序进行

我有一个重要的数据集,并希望调用缓慢但干净的方法,而不是调用快速方法,副作用对第一个结果.我对中间结果不感兴趣,所以我不想收集它们.

显而易见的解决方案是创建并行流,进行慢速呼叫,再次使流顺序,并进行快速呼叫.问题是,所有代码都在单线程中执行,没有实际的并行性.

示例代码:

@Test
public void testParallelStream() throws ExecutionException, InterruptedException
{
    ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors() * 2);
    Set<String> threads = forkJoinPool.submit(()-> new Random().ints(100).boxed()
            .parallel()
            .map(this::slowOperation)
            .sequential()
            .map(Function.identity())//some fast operation, but must be in single thread
            .collect(Collectors.toSet())
    ).get();
    System.out.println(threads);
    Assert.assertEquals(Runtime.getRuntime().availableProcessors() * 2, threads.size());
}

private String slowOperation(int value)
{
    try
    {
        Thread.sleep(100);
    }
    catch (InterruptedException e)
    {
        e.printStackTrace();
    }
    return Thread.currentThread().getName();
}
Run Code Online (Sandbox Code Playgroud)

如果我删除sequential,代码按预期执行,但显然,非并行操作将在多个线程中调用.

你能推荐一些关于这种行为的引用,或者某些方法可以避免临时收集吗?

java multithreading forkjoinpool java-stream

13
推荐指数
1
解决办法
2566
查看次数

Java 8并行流和ThreadLocal

我试图找出如何在Java 8并行流中复制ThreadLocal值.

所以如果我们考虑这个:

    public class ThreadLocalTest {

        public static void main(String[] args)  {
            ThreadContext.set("MAIN");
            System.out.printf("Main Thread: %s\n", ThreadContext.get());

            IntStream.range(0,8).boxed().parallel().forEach(n -> {
                System.out.printf("Parallel Consumer - %d: %s\n", n, ThreadContext.get());
            });
        }

        private static class ThreadContext {
            private static ThreadLocal<String> val = ThreadLocal.withInitial(() -> "empty");

            public ThreadContext() {
            }

            public static String get() {
                return val.get();
            }

            public static void set(String x) {
                ThreadContext.val.set(x);
            }
        }
    }
Run Code Online (Sandbox Code Playgroud)

哪个输出

Main Thread: MAIN
Parallel Consumer - 5: MAIN
Parallel Consumer - 4: MAIN
Parallel …
Run Code Online (Sandbox Code Playgroud)

java threadpool java-8 forkjoinpool

12
推荐指数
1
解决办法
4169
查看次数

什么是ForkJoinPool异步模式

ForkJoinPool的异步模式是什么意思?Javadoc提到它使队列(它是每线程队列吗?)FIFO而不是LIFO.这在实践中意味着什么?

java fork-join threadpool forkjoinpool

11
推荐指数
2
解决办法
4237
查看次数

Scala异步与Java ForkJoinTask

前段时间我发现了Scala异步项目.问题是:这个async块中有什么神奇的东西不能通过普通函数实现(没有宏扩展)?

让我们看一下介绍中的第一个例子:

import ExecutionContext.Implicits.global
import scala.async.Async.{async, await}

val future = async {
    val f1 = async { ...; true }
    val f2 = async { ...; 42 }
    if (await(f1)) await(f2) else 0
}
Run Code Online (Sandbox Code Playgroud)

我在上面的例子中看不到任何不能用纯Java编写的东西.这段代码完全相同:

import java.util.concurrent.*;
import java.util.function.Supplier;

// First define a helper method for creating async blocks:
public static <T> ForkJoinTask<T> async(Supplier<T> supplier) {
    return new RecursiveTask<T>() {
        @Override
        protected T compute() {
            return supplier.get();
        }
    }.fork();
}

ForkJoinTask<Integer> future = ForkJoinPool.commonPool().submit(() -> { …
Run Code Online (Sandbox Code Playgroud)

java asynchronous scala async-await forkjoinpool

11
推荐指数
1
解决办法
842
查看次数