据我所知,jdk7支持fork-and-join,我可以在JDK6中使用fork-and-join而无需升级到JDK7.0吗?
例如,工作窃取在Java平台上的Fork/Join框架中可用.(请参阅fork/join框架如何比线程池更好?) - 与OmniThreadLibrary类似吗?
工作窃取:用尽线索的工作线程可以从其他忙碌的线程中窃取任务.
我是Scala/Akka的新手,虽然我非常熟悉基于actor的建模概念.我试图并行化现有代码以获得更好的性能,我有两个版本:一个在Scala/Akka中,一个在Java 7的ForkJoinPool中.
我期待基于演员的方法应该更快,但结果却相反.Scala/Akka的时间为20秒,而Java fork/join则为17秒.
我想知道akka本质上是否较慢?或者可能是因为我在我的两个实现中使用普通Java编写的现有代码中的类?
我在Java 7中阅读了一篇关于fork-join框架的精彩文章,其思想是,使用ForkJoinPool和ForkJoinTask,池中的线程可以从其他任务中获取子任务,因此它可以使用更少的线程来处理更多任务.
然后我尝试使用法线ExecutorService来做同样的工作,发现我无法区分,因为当我向池中提交新任务时,任务将在另一个可用线程上运行.
我能说的唯一区别是,如果我使用ForkJoinPool,我不需要将池传递给任务,因为我可以调用task.fork()它使其在另一个线程上运行.但是正常情况下ExecutorService,我必须将池传递给任务,或者使它成为静态,所以在任务中,我可以调用pool.submit(newTask)
我错过了什么吗?
(您可以从https://github.com/freewind/fork-join-test/tree/master/src查看生活代码)
我目前正在链接一堆http请求,但是在订阅之前我无法处理404错误.
我的代码:
在模板中:
...
service.getData().subscribe(
data => this.items = data,
err => console.log(err),
() => console.log("Get data complete")
)
...
Run Code Online (Sandbox Code Playgroud)
在服务中:
...
getDataUsingUrl(url) {
return http.get(url).map(res => res.json());
}
getData() {
return getDataUsingUrl(urlWithData).flatMap(res => {
return Observable.forkJoin(
// make http request for each element in res
res.map(
e => getDataUsingUrl(anotherUrlWithData)
)
)
}).map(res => {
// 404s from previous forkJoin
// How can I handle the 404 errors without subscribing?
// I am looking to make more http requests from …Run Code Online (Sandbox Code Playgroud) 我试图弄清楚blocking构造。虽然还不清楚它的内部工作原理,但是我得到的一般想法是,只要我使用Scala的全局线程池,用blocking上下文包装我的代码就可以确保线程池会为这项工作创造额外的空间(因为它不受CPU限制)。
(1 to 1000).foreach { i =>
Future {
println(i)
Thread.sleep(100 * 1000)
}
}
Run Code Online (Sandbox Code Playgroud)
将迅速显示只能同时运行8个作业,而
(1 to 1000).foreach { i =>
Future {
blocking {
println(i)
Thread.sleep(100 * 1000)
}
}
}
Run Code Online (Sandbox Code Playgroud)
将显示现在我们大约有250个同时工作。哇!然后让我措手不及的是
(1 to 1000).foreach { i =>
Future {
println(i)
Thread.sleep(100 * 1000)
}
}
('a' to 'z').foreach { c =>
Future {
blocking {
println(c)
Thread.sleep(100 * 1000)
}
}
}
Run Code Online (Sandbox Code Playgroud)
只会再次显示8个同时进行的作业-阻止的作业不会立即执行。
为什么是这样?blocking上下文的内部机制到底是什么?
题
由于 Fork-Join 似乎是当前的炒作并在许多答案中被推荐,我想:为什么不对它的实际速度进行一些研究?
为了衡量这一点,我编写了一个小程序(见下面的代码),它做一些数字相加,并用各种参数将其分叉出来,包括线程数、分叉深度和分叉传播,然后测量执行时间,尤其是实际计算所花费的时间与分叉所花费的时间。
摘要答案
虽然实施得很好,但 ForkJoin 是并行化任务的一种极其低效的方式,因为每个 fork 的成本非常高。一个简单的问题优化实现可以轻松地存档 99% 的线程执行时间(这超过了使用 Fork-Join 测量的所有内容),因此这样的实现总是比 Fork-Join 实现更快。此外,如果每个 fork 的实际任务很小,则 Fork-Join 实现可能比单线程线性实现慢得多。
所以 Fork-Join 更重要的是它是否有助于您的代码架构,因为它与其他实现相比没有任何性能优势。因此,只有在以下情况下才应使用 Fork-Join:
性能并不重要,任务经常需要等待其他任务的结果才能继续。所以基本上如果 Fork-Join 结构大大简化了一个简单的实现的任务。
实际任务需要大大超过分叉的成本,因此损失可以忽略不计。在我的测试中,添加 2 个值的循环必须在每个 fork 循环至少 10000 次才能获得合理的性能。
编辑:请参阅此处了解我被指出的更深入的分析。
测试设置
在我的程序中,我有一个 RecursiveTask 计算给定 N 的斐波那契数列,这将实际计算减少到 3 个分配和 1 个加法。对于任何给定的 CPU,这应该是一项次要任务。
在测试中,我改变了线程数量、每个任务的分叉数量和斐波那契循环的长度。此外,我对 async 参数进行了一些测试,但将其设置为 false 只会显示计算时间的轻微减少,因此我跳过了它。传播参数(fork per fork)也大部分被跳过,因为结果没有显着差异。
一般来说,计算时间是非常稳定的,实际花费在任务上的时间百分比通常变化不到 1%,因此每个测试集已经在其他空闲系统上运行了大约 5 次(如果数字不稳定,则更多)具有 4 个核心(+4 个超核心),然后选择了中值执行时间。
已通过各种测试变量验证了正确执行,特别是已验证使用的实际线程数与最初给定的并行度参数绝无不同。
详细测试结果
在哪里:
Time total 是从主线程的角度来看整个计算所花费的总时间。Time task 是在所有分叉组合中实际计算斐波那契数列所花费的时间。Time task percentage …假设我有如下代码:
Future<Object> executeBy(ExecutorService executor) {
return executor.submit(() -> {
throw new IllegalStateException();
});
}
Run Code Online (Sandbox Code Playgroud)
使用ForkJoinPool#commonPool时没有问题,但是当我使用并行性时ForkJoinPool它会加倍IllegalStateException.例如:
executeBy(new ForkJoinPool(1)).get();
// ^--- double the IllegalStateException
Run Code Online (Sandbox Code Playgroud)
Q1:为什么并行ForkJoinPool一倍Exception发生在Callable?
Q2:如何避免这种奇怪的行为?
我正在尝试对一些大数据实施分而治之的解决方案。我使用 fork 和 join 将事物分解为线程。但是我有一个关于分叉机制的问题:如果我将分而治之的条件设置为:
@Override
protected SomeClass compute(){
if (list.size()<LIMIT){
//Do something here
...
}else{
//Divide the list and invoke sub-threads
SomeRecursiveTaskClass subWorker1 = new SomeRecursiveTaskClass(list.subList());
SomeRecursiveTaskClass subWorker2 = new SomeRecursiveTaskClass(list.subList());
invokeAll(subWorker1, subWorker2);
...
}
}
Run Code Online (Sandbox Code Playgroud)
如果没有足够的资源可供调用subWorker(例如池中没有足够的线程),会发生什么情况?Fork/Join 框架是否维护可用线程的池大小?或者我应该将这个条件添加到我的分治逻辑中?
我想在两个 Observable 返回值后调用一个方法。我做了一些搜索,这似乎forkJoin是我想要的,但我无法让它工作。我知道这两个 Observable 都在返回值,因为我在组件的其他地方单独使用每个数据,所以很明显我做错了其他事情。
这是我尝试过的。我正在使用 rxjs v6.4。
forkJoin(
this.store.pipe(select(fromStore.getAppointmentsLoading)),
this.clientStore.pipe(select(fromClientStore.getClientsLoading)),
).subscribe(
([res1, res2]) => {
console.log('res1', res1);
console.log('res2', res2);
},
err => console.error(err),
);
Run Code Online (Sandbox Code Playgroud)
没有任何内容记录到控制台,我也没有收到任何错误。同样,我传入的 Observable 肯定是返回值。
我做错了什么,还是我完全使用错误的方法forkJoin?
fork-join ×10
java ×6
angular ×2
performance ×2
rxjs ×2
akka ×1
concurrency ×1
delphi ×1
exception ×1
forkjoinpool ×1
http ×1
java-8 ×1
jdk1.6 ×1
ngrx ×1
observable ×1
scala ×1