Brian Goetz在http://www.ibm.com/developerworks/java/library/j-jtp03048.html上写了一篇关于fork-join的好文章.在其中,他列出了使用fork-join机制的合并排序算法,在该机制中,他并行执行数组两侧的排序,然后合并结果.
该算法同时对同一阵列的两个不同部分进行排序.为什么不是AtomicIntegerArray或维持可见性所需的其他机制?什么保证一个线程会看到另一个线程完成的写入,或者这是一个微妙的错误?作为后续跟进,Scala的ForkJoinScheduler是否也提供此保证?
谢谢!
我正在比较测试程序的两个变体.两者都ForkJoinPool
在具有四个内核的机器上使用4线程运行.
在'模式1'中,我非常像执行器服务使用池.我把一堆任务扔进去ExecutorService.invokeAll
.我获得了比普通的固定线程执行器服务更好的性能(即使有调用Lucene,那里也有一些I/O).
这里没有分而治之的.从字面上看,我做到了
ExecutorService es = new ForkJoinPool(4);
es.invokeAll(collection_of_Callables);
Run Code Online (Sandbox Code Playgroud)
在'模式2'中,我向池中提交单个任务,并在该任务中调用ForkJoinTask.invokeAll来提交子任务.所以,我有一个继承自的对象,RecursiveAction
它被提交到池中.在该类的compute方法中,我调用了invokeAll
来自不同类的对象集合,这些对象也继承自RecursiveAction
.出于测试目的,我只提交第一个对象的一次一个.我天真地期望看到所有四个线程忙什么,因为线程调用invokeAll
将为自己抓取一个子任务而不是仅仅坐着和阻塞.我可以想到为什么它可能不会那样工作的一些原因.
在VisualVM中观察,在模式2中,一个线程几乎总是在等待.我希望看到的是调用invokeAll的线程立即开始处理其中一个被调用的任务,而不仅仅是静坐.这肯定比使用普通线程池尝试此方案所导致的死锁更好,但仍然是什么?它是否保留一个线程,以防其他东西被提交?而且,如果是这样,为什么模式1中的问题不同?
到目前为止,我一直使用添加到java 1.6的引导类路径的jsr166 jar来运行它.
我尝试使用ForkJoinPool 来并行化我的CPU密集型计算.我对ForkJoinPool的理解是,只要任何任务可以执行,它就会继续工作.不幸的是,我经常观察工作线程空闲/等待,因此并非所有CPU都保持忙碌状态.有时我甚至观察到额外的工作线程.
我没想到这一点,因为我严格尝试使用非阻塞任务.我的观察非常类似于ForkJoinPool似乎浪费了一个线程.在对ForkJoinPool进行了大量调试之后我猜了一下:
我使用invokeAll()在子任务列表上分配工作.在invokeAll()完成后执行第一个任务本身,它开始加入其他任务.这很好,直到下一个要连接的任务位于执行队列之上.不幸的是,我提交了异步的其他任务而没有加入它们.我期望ForkJoin框架首先继续执行这些任务,然后再转回加入任何剩余的任务.
但它似乎不是这样工作的.相反,工作线程停止调用wait()直到等待的任务准备好(可能是由其他工作线程执行).我没有验证这一点,但似乎是调用join()的一般缺陷.
ForkJoinPool提供了一个asyncMode,但这是一个全局参数,不能用于单个提交.但我喜欢看到我的异步分叉任务很快就会被执行.
那么,为什么ForkJoinTask.doJoin()不是简单地在其队列之上执行任何可用任务,直到它准备好(由自己执行或被其他人窃取)?
我有一个递归的分治算法,在开始分割之前需要两个计算密集的基本案例任务.最初的基本案例是独立的任务,所以我想并行完成.在基本情况之后,除法运行相同的任务,在0和1之间输入不同的输入,并根据输出决定是否再次拆分.我通过创建一个伪造递归的任务包装器对象来使基本案例工作,但这感觉就像一个kludge,如下所示:
public static void doSomething () {
ForkJoinPool pool = new ForkJoinPool();
private ArrayList<Object> al = new ArrayList<Object>();
TaskWrapper tw = new TaskWrapper(true,-1);
al.addAll(pool.invoke(tw));
}
@SuppressWarnings("serial")
public static class TaskWrapper extends RecursiveTask<ArrayList<Object>> {
private ArrayList<Object> al = new ArrayList<Object>();
private boolean arg;
private double input;
private Object out;
TaskWrapper(boolean ar, double in){
arg = ar;
input = in;
}
@Override
public ArrayList<Object> compute() {
if (arg == false) {
out = new Object(runIntensiveTask(input));
al.add(out);
}
else {
// Right Base …
Run Code Online (Sandbox Code Playgroud) java multithreading java.util.concurrent fork-join forkjoinpool
在a中ForkJoinPool
ForkJoinTask
,当前工作线程是否参与工作窃取?
我已经读到了fork连接池可以从阻塞或等待线程中窃取的含义.目前的工人似乎是一个明显的候选人 一旦工作人员调用.join()
另一个任务,那么该任务基本上被阻止.
另一方面,我看到许多文章暗示了不同的结论.例如,当前工作者线程在等待分叉任务之前应该工作的普遍共识.
有迹象表明,讨论使用的几篇文章ForkJoinTask.getSurplusQueuedTaskCount
,通过具有当前工人平衡在队列中的工作方法做了一些工作.如果当前工人也在偷窃,那么这似乎没有必要.
当然,我希望最大化线程操作并使所有工作者最大限度地运行.了解当前线程是否也窃取工作(例如何时.join
被调用)将有助于澄清.
是否可以配置ForkJoinPool
为使用1个执行线程?
我正在执行Random
在一个内部调用的代码ForkJoinPool
.每次运行时,我都会遇到不同的运行时行为,因此很难调查回归.
我希望代码库提供"调试"和"发布"模式."debug"模式将Random
使用固定种子配置,并ForkJoinPool
使用单个执行线程."release"模式将使用系统提供的Random
种子并使用默认的ForkJoinPool
线程数.
我尝试ForkJoinPool
使用1的并行性配置,但它使用2个线程(main
和第二个工作线程).有任何想法吗?
在这里,我使用forkJoin
rxjs来并行地订阅一个可观察对象数组。但是我要一个一个地订阅,什么是最好的解决方案?
下面是我的代码:
var observables = [];
Observable.forkJoin(observables)
.subscribe(() => {
this.msgs = [];
this.msgs.push({
severity: 'success',
summary: 'Saved Successfully'
});
this.onSaveComplete();
}, (error: any) => this.errorMessage = <any>error);
}, (error: any) => this.errorMessage = <any>error);
Run Code Online (Sandbox Code Playgroud) 我有一些不是线程安全的类:
class ThreadUnsafeClass {
long i;
long incrementAndGet() { return ++i; }
}
Run Code Online (Sandbox Code Playgroud)
(我在long
这里使用了a 作为字段,但我们应该将其字段视为一些线程不安全的类型).
我现在有一个看起来像这样的课程
class Foo {
final ThreadUnsafeClass c;
Foo(ThreadUnsafeClass c) {
this.c = c;
}
}
Run Code Online (Sandbox Code Playgroud)
也就是说,线程不安全类是它的最后一个字段.现在我要这样做:
public class JavaMM {
public static void main(String[] args) {
final ForkJoinTask<ThreadUnsafeClass> work = ForkJoinTask.adapt(() -> {
ThreadUnsafeClass t = new ThreadUnsafeClass();
t.incrementAndGet();
return new FC(t);
});
assert (work.fork().join().c.i == 1);
}
}
Run Code Online (Sandbox Code Playgroud)
也就是说,从thread T
(main),我调用了一些工作T'
(fork-join-pool),它创建并改变了我的不安全类的实例,然后返回包含在a中的结果Foo
.请注意,我的线程不安全类的所有变异都发生在一个线程上T'
.
问题1:我是否保证thread-unsafe-class实例的结束状态安全地移植到?的 …
java parallel-processing fork-join java-memory-model java-stream
我想在Java 8中试用ForkJoinPool,所以我编写了一个小程序,用于搜索名称中包含给定目录中特定关键字的所有文件.
计划:
public class DirectoryService {
public static void main(String[] args) {
FileSearchRecursiveTask task = new FileSearchRecursiveTask("./DIR");
ForkJoinPool pool = (ForkJoinPool) Executors.newWorkStealingPool();
List<String> files = pool.invoke(task);
pool.shutdown();
System.out.println("Total no of files with hello" + files.size());
}
}
class FileSearchRecursiveTask extends RecursiveTask<List<String>> {
private String path;
public FileSearchRecursiveTask(String path) {
this.path = path;
}
@Override
protected List<String> compute() {
File mainDirectory = new File(path);
List<String> filetedFileList = new ArrayList<>();
List<FileSearchRecursiveTask> recursiveTasks = new ArrayList<>();
if(mainDirectory.isDirectory()) {
System.out.println(Thread.currentThread() + " …
Run Code Online (Sandbox Code Playgroud) 免责声明:这是我第一次使用Java的Fork-Join框架,所以我并不是100%确定我正确使用它.Java也不是我的主要编程语言,所以这也可能是相关的.
鉴于以下SSCCE:
import java.util.Arrays;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveAction;
class ForkCalculator extends RecursiveAction
{
private final Integer[] delayTasks;
public ForkCalculator(Integer[] delayTasks)
{
this.delayTasks = delayTasks;
}
@Override
protected void compute()
{
if (this.delayTasks.length == 1) {
this.computeDirectly();
return;
}
Integer halfway = this.delayTasks.length / 2;
ForkJoinTask.invokeAll(
new ForkCalculator(
Arrays.copyOfRange(this.delayTasks, 0, halfway)
),
new ForkCalculator(
Arrays.copyOfRange(this.delayTasks, halfway, this.delayTasks.length)
)
);
}
private void computeDirectly()
{
Integer delayTask = this.delayTasks[0];
try {
Thread.sleep(delayTask);
} catch …
Run Code Online (Sandbox Code Playgroud) fork-join ×10
java ×9
forkjoinpool ×4
java-8 ×2
angular ×1
java-7 ×1
java-stream ×1
jsr166 ×1
lock-free ×1
observable ×1
rxjs ×1
scala ×1