Java 5引入了Executor框架形式的线程池对异步任务执行的支持,其核心是java.util.concurrent.ThreadPoolExecutor实现的线程池.Java 7以java.util.concurrent.ForkJoinPool的形式添加了一个备用线程池.
查看各自的API,ForkJoinPool在标准场景中提供了ThreadPoolExecutor功能的超集(虽然严格来说ThreadPoolExecutor提供了比ForkJoinPool更多的调优机会).除此之外,fork/join任务看起来更快(可能是因为工作窃取调度程序)的观察结果显然需要更少的线程(由于非阻塞连接操作),可能会让人觉得ThreadPoolExecutor已被取代ForkJoinPool.
但这真的是对的吗?我读过的所有材料似乎总结为两种类型的线程池之间相当模糊的区别:
这种区别是否正确?我们能说出更具体的内容吗?
java parallel-processing threadpool threadpoolexecutor forkjoinpool
我想Runnable
通过一种方法将任务提交到ForkJoinPool:
forkJoinPool.submit(Runnable task)
Run Code Online (Sandbox Code Playgroud)
注意,我使用的是JDK 7.
在引擎盖下,它们被转换为ForkJoinTask对象.我知道当一个任务以递归方式分成较小的任务时,ForkJoinPool是有效的.
题:
如果没有递归,窃取工作仍然可以在ForkJoinPool中工作吗?
在这种情况下值得吗?
更新1: 任务很小,可能不平衡.即使对于严格相同的任务,诸如上下文切换,线程调度,停车,页面未命中等事情也会妨碍导致不平衡.
更新2: Doug Lea在Concurrency JSR-166兴趣小组中写道,给出了一个暗示:
当所有任务都是异步并提交到池而不是分叉时,这也极大地提高了吞吐量,这成为构造actor框架的合理方法,以及许多您可能使用ThreadPoolExecutor的普通服务.
我认为,当涉及到相当小的CPU绑定任务时,由于这种优化,ForkJoinPool是可行的方法.重点是这些任务已经很小,不需要递归分解.工作窃取工作,无论是大型还是小型任务 - 任务都可以被来自忙碌工人的Deque尾巴的另一个自由工作者抓住.
更新3: ForkJoinPool的可扩展性 - Akka乒乓球队的基准测试显示了很好的结果.
尽管如此,要更有效地应用ForkJoinPool需要进行性能调整.
以下是我对Java 8 的Stream框架的理解:
虽然有人已经找到了一种方法来使用自定义线程池和Stream框架的并行执行,但我不能在Java 8 API中找到任何提及默认Java 8并行Stream实现将使用ForkJoinPool#commonPool()的内容.(Collection#parallelStream(),StreamSupport类中的方法,以及API中我不知道的其他可能的并行启用流源).
我只能搜索搜索结果的花絮是:
Lambda的状态:Libraries Edition("引擎盖下的并行")
Vaguely提到Stream框架和Fork/Join机制.
Fork/Join机器旨在实现此过程的自动化.
JEP 107:集合的批量数据操作
几乎直接表明Collection接口的默认方法#parallelStream()使用Fork/Join实现自身.但仍然没有关于公共池.
并行实现基于Java 7中引入的java.util.concurrency Fork/Join实现.
类数组(Javadoc)
直接表示使用公共池的多次.
ForkJoin公共池用于执行任何并行任务.
所以我的问题是:
在哪里说ForkJoinPool#commonPool()用于对从Java 8 API获得的流进行并行操作?
我在多线程环境中经历了不同的并发模型(http://tutorials.jenkov.com/java-concurrency/concurrency-models.html)
本文重点介绍了三种并发模型.
并行工人
第一个并发模型就是我所说的并行工作模型.传入的工作分配给不同的工作人员.
流水线
工人们正在举办类似的工人在工厂的流水线.每个工人只执行完整工作的一部分.当该部分完成时,工人将工作转发给下一个工人.
每个工作者都在自己的线程中运行,并且不与其他工作者共享任何状态.这有时也称为无共享并发模型.
功能并行
函数并行的基本思想是使用函数调用实现程序.函数可以被视为彼此发送消息的" 代理 "或" 参与者 ",就像在流水线并发模型(AKA反应或事件驱动系统)中一样.当一个函数调用另一个函数时,这类似于发送消息.
现在我想为这三个概念映射java API支持
并行工作者:它是ExecutorService,ThreadPoolExecutor,CountDownLatch API吗?
装配线:将事件发送到JMS等消息传递系统并使用队列和主题的消息传递概念.
功能并行:ForkJoinPool在某种程度上和java 8流.与溪流相比,ForkJoin池很容易理解.
我是否正确映射这些并发模型?如果没有,请纠正我.
java multithreading executorservice countdownlatch forkjoinpool
我有一个关于CompletableFuture方法的问题:
public <U> CompletableFuture<U> thenApply(Function<? super T, ? extends U> fn)
Run Code Online (Sandbox Code Playgroud)
事情是JavaDoc说的就是这样:
返回一个新的CompletionStage,当该阶段正常完成时,将使用此阶段的结果作为所提供函数的参数执行.有关特殊完成的规则,请参阅CompletionStage文档.
线程怎么样?这将在哪个线程中执行?如果未来由线程池完成怎么办?
使用中的低级差异是什么:
ForkJoinPool = new ForkJoinPool(X);
Run Code Online (Sandbox Code Playgroud)
和
ExecutorService ex = Executors.neWorkStealingPool(X);
Run Code Online (Sandbox Code Playgroud)
其中X是所需的并行级别,即线程运行..
根据文档我发现它们相似.还告诉我哪一个在任何正常用途下更合适和安全.我有1.3亿个条目写入BufferedWriter并使用Unix排序按第1列排序.
另请告诉我如果可能的话要保留多少个线程.
注意:我的系统有 8个核心处理器和 32 GB RAM.
multithreading executorservice fork-join executors forkjoinpool
我有一个重要的数据集,并希望调用缓慢但干净的方法,而不是调用快速方法,副作用对第一个结果.我对中间结果不感兴趣,所以我不想收集它们.
显而易见的解决方案是创建并行流,进行慢速呼叫,再次使流顺序,并进行快速呼叫.问题是,所有代码都在单线程中执行,没有实际的并行性.
示例代码:
@Test
public void testParallelStream() throws ExecutionException, InterruptedException
{
ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors() * 2);
Set<String> threads = forkJoinPool.submit(()-> new Random().ints(100).boxed()
.parallel()
.map(this::slowOperation)
.sequential()
.map(Function.identity())//some fast operation, but must be in single thread
.collect(Collectors.toSet())
).get();
System.out.println(threads);
Assert.assertEquals(Runtime.getRuntime().availableProcessors() * 2, threads.size());
}
private String slowOperation(int value)
{
try
{
Thread.sleep(100);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
return Thread.currentThread().getName();
}
Run Code Online (Sandbox Code Playgroud)
如果我删除sequential
,代码按预期执行,但显然,非并行操作将在多个线程中调用.
你能推荐一些关于这种行为的引用,或者某些方法可以避免临时收集吗?
我试图找出如何在Java 8并行流中复制ThreadLocal值.
所以如果我们考虑这个:
public class ThreadLocalTest {
public static void main(String[] args) {
ThreadContext.set("MAIN");
System.out.printf("Main Thread: %s\n", ThreadContext.get());
IntStream.range(0,8).boxed().parallel().forEach(n -> {
System.out.printf("Parallel Consumer - %d: %s\n", n, ThreadContext.get());
});
}
private static class ThreadContext {
private static ThreadLocal<String> val = ThreadLocal.withInitial(() -> "empty");
public ThreadContext() {
}
public static String get() {
return val.get();
}
public static void set(String x) {
ThreadContext.val.set(x);
}
}
}
Run Code Online (Sandbox Code Playgroud)
哪个输出
Main Thread: MAIN
Parallel Consumer - 5: MAIN
Parallel Consumer - 4: MAIN
Parallel …
Run Code Online (Sandbox Code Playgroud) ForkJoinPool的异步模式是什么意思?Javadoc提到它使队列(它是每线程队列吗?)FIFO而不是LIFO.这在实践中意味着什么?
前段时间我发现了Scala异步项目.问题是:这个async
块中有什么神奇的东西不能通过普通函数实现(没有宏扩展)?
让我们看一下介绍中的第一个例子:
import ExecutionContext.Implicits.global
import scala.async.Async.{async, await}
val future = async {
val f1 = async { ...; true }
val f2 = async { ...; 42 }
if (await(f1)) await(f2) else 0
}
Run Code Online (Sandbox Code Playgroud)
我在上面的例子中看不到任何不能用纯Java编写的东西.这段代码完全相同:
import java.util.concurrent.*;
import java.util.function.Supplier;
// First define a helper method for creating async blocks:
public static <T> ForkJoinTask<T> async(Supplier<T> supplier) {
return new RecursiveTask<T>() {
@Override
protected T compute() {
return supplier.get();
}
}.fork();
}
ForkJoinTask<Integer> future = ForkJoinPool.commonPool().submit(() -> { …
Run Code Online (Sandbox Code Playgroud) forkjoinpool ×10
java ×9
fork-join ×4
threadpool ×3
java-8 ×2
java-stream ×2
async-await ×1
asynchronous ×1
executors ×1
scala ×1