Rad*_*zea 6 java fork-join java-8 forkjoinpool
免责声明:这是我第一次使用Java的Fork-Join框架,所以我并不是100%确定我正确使用它.Java也不是我的主要编程语言,所以这也可能是相关的.
鉴于以下SSCCE:
import java.util.Arrays;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveAction;
class ForkCalculator extends RecursiveAction
{
private final Integer[] delayTasks;
public ForkCalculator(Integer[] delayTasks)
{
this.delayTasks = delayTasks;
}
@Override
protected void compute()
{
if (this.delayTasks.length == 1) {
this.computeDirectly();
return;
}
Integer halfway = this.delayTasks.length / 2;
ForkJoinTask.invokeAll(
new ForkCalculator(
Arrays.copyOfRange(this.delayTasks, 0, halfway)
),
new ForkCalculator(
Arrays.copyOfRange(this.delayTasks, halfway, this.delayTasks.length)
)
);
}
private void computeDirectly()
{
Integer delayTask = this.delayTasks[0];
try {
Thread.sleep(delayTask);
} catch (InterruptedException ex) {
System.err.println(ex.getMessage());
System.exit(2);
}
System.out.println("Finished computing task with delay " + delayTask);
}
}
public final class ForkJoinBlocker
{
public static void main(String[] args)
{
ForkCalculator calculator = new ForkCalculator(
new Integer[]{1500, 1400, 1950, 2399, 4670, 880, 5540, 1975, 3010, 4180, 2290, 1940, 510}
);
ForkJoinPool pool = new ForkJoinPool(
Runtime.getRuntime().availableProcessors()
);
pool.invoke(calculator);
//make it a daemon thread
Timer timer = new Timer(true);
timer.scheduleAtFixedRate(
new TimerTask() {
@Override
public void run()
{
System.out.println(pool.toString());
}
},
100,
2000
);
}
}
Run Code Online (Sandbox Code Playgroud)
所以我创建了一个ForkJoinPool
我提交了一些处理的任务.Thread.sleep()
为了这个例子的目的,我替换它们,以保持简单.
在我的实际程序中,这是一个很长的任务列表,所以我想定期在标准输出上打印当前状态.我尝试使用计划在单独的线程上执行此操作TimerTask
.
但是,我注意到了一些我没想到的东西:在我的例子中,输出是这样的:
Finished computing task with delay 1500
Finished computing task with delay 2399
Finished computing task with delay 1400
Finished computing task with delay 4180
Finished computing task with delay 1950
Finished computing task with delay 5540
Finished computing task with delay 880
.......
Run Code Online (Sandbox Code Playgroud)
这意味着永远不会执行"状态任务".
但是,如果我修改我的代码以pool.invoke(calculator);
在最后移动,那么它按预期工作:
java.util.concurrent.ForkJoinPool@59bf63ba[Running, parallelism = 4, size = 4, active = 4, running = 0, steals = 0, tasks = 5, submissions = 0]
Finished computing task with delay 1500
java.util.concurrent.ForkJoinPool@59bf63ba[Running, parallelism = 4, size = 4, active = 4, running = 0, steals = 0, tasks = 5, submissions = 0]
Finished computing task with delay 2399
Finished computing task with delay 1400
java.util.concurrent.ForkJoinPool@59bf63ba[Running, parallelism = 4, size = 4, active = 4, running = 0, steals = 0, tasks = 4, submissions = 0]
Finished computing task with delay 4180
Finished computing task with delay 1950
......
Run Code Online (Sandbox Code Playgroud)
我可以得出的唯一结论是ForkJoinPool::invoke()
阻塞主线程(它只在完成池中的所有任务后才返回).
我希望主线程中的代码继续执行,而fork-join-pool中的任务是异步处理的.
我的问题是:这是否发生是因为我错误地使用了框架?我的代码中有什么必须纠正的吗?
我注意到其中一个ForkJoinPool
构造函数有一个boolean asyncMode
参数但是,从我从实现中可以看出,这只是决定FIFO_QUEUE
和LIFO_QUEUE
执行模式之间(不完全确定它们是什么):
public ForkJoinPool(
int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
boolean asyncMode
) {
this(checkParallelism(parallelism),
checkFactory(factory),
handler,
asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
"ForkJoinPool-" + nextPoolId() + "-worker-");
checkPermission();
}
Run Code Online (Sandbox Code Playgroud)
基本上invoke()
会等待整个任务完成后再返回,所以是的,主线程正在阻塞。之后,它Timer
没有时间执行,因为它在守护线程上运行。
您可以简单地使用execute()
which 来invoke()
异步运行任务。然后您可以join()
等待ForkJoinTask
结果,在此期间它将Timer
运行:
ForkJoinPool pool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
pool.execute(calculator);
//make it a daemon thread
Timer timer = new Timer(true);
timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
System.out.println(pool.toString());
}
}, 100, 2000);
calculator.join(); // wait for computation
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
545 次 |
最近记录: |