我的程序通过分而治之的方法搜索问题的解决方案(任何解决方案),使用递归和实现RecursiveTasks
:我为该部门的第一个分支派一个任务,然后递归到第二个分支:如果第二个分支找到了解决方案,然后我取消了第一个分支,否则我等待它的结果.
这可能不是最佳的.如果找到解决方案,一种方法是针对任何已启动的任务抛出异常.但是,我将如何取消所有已启动的任务?取消任务是否也取消所有子任务?
我有一个自定义ForkJoinPool创建的并行度为25.
customForkJoinPool = new ForkJoinPool(25);
Run Code Online (Sandbox Code Playgroud)
我有一个包含700个文件名的列表,我使用这样的代码从S3并行下载文件并将它们转换为Java对象:
customForkJoinPool.submit(() -> {
return fileNames
.parallelStream()
.map((fileName) -> {
Logger log = Logger.getLogger("ForkJoinTest");
long startTime = System.currentTimeMillis();
log.info("Starting job at Thread:" + Thread.currentThread().getName());
MyObject obj = readObjectFromS3(fileName);
long endTime = System.currentTimeMillis();
log.info("completed a job with Latency:" + (endTime - startTime));
return obj;
})
.collect(Collectors.toList);
});
});
Run Code Online (Sandbox Code Playgroud)
当我查看日志时,我看到只使用了5个线程.平行度为25,我预计这将使用25个线程.下载文件并将文件转换为对象的平均延迟时间约为200毫秒.我错过了什么?
可能更好的问题是并行流如何在为其创建线程之前分析原始列表的分割量?在这种情况下,看起来它决定将它拆分5次并停止.
我正在寻找组织良好的信息来源,关于如何使用即将推出的jsr166y(fork-join,fences)和extras166y(ParallelArray等) - 从教程到专家级别.
我正在研究Java 7中新的Fork-Join框架(作为课程要求的一部分),并分析与传统线程机制相比的性能改进.使用新的fork join框架可以保证更快运行的分而治之算法有哪些.你能否提出我可以用来分析性能差异的任何非平凡算法?
我开始学习ExecutorService类.文档(和在线教程)说总是调用ExecutorService.shutDown()来回收资源.但是,文档还说在调用shutDown()之后,不会接受任何新任务.所以,我的问题是,每当我需要并行化数据处理时,我是否始终必须实例化一个新的ExecutorService?
现在我有一个可调用对象列表,我执行以下操作.
public void someMethod() {
List<OuterCallable> outerCallables = getOuterCallables();
ExecutorService executor = Executor.newFixedThreadPool(NUM_CPUS);
executor.invokeAll(tasks);
executor.shutDown();
}
Run Code Online (Sandbox Code Playgroud)
但是,我的OuterCallable还使用InnerCallable并行分割数据或执行数据处理.
public class OuterCallable implements Callable<Long> {
public Long call() throws Exception {
long result = 0L;
List<InnerCallable> innerCallables = getInnerCallables();
ExecutorServices executor = Executor.newFixedThreadPool(NUM_CPUS);
executor.invokeAll(tasks);
executor.shutDown();
return result;
}
}
Run Code Online (Sandbox Code Playgroud)
我不记得它是用于ExecutorService还是Fork/Join方法,但我记得文档和教程说操作数据的实际并行过程不应该涉及I/O操作,一切都应该在内存中完成.但是,在我的InnerCallable中,我实际上正在进行JDBC调用(此处未显示).
最终,我使用ExecutorService的方式有效,但我仍然有一些挥之不去的担忧.
作为最后一个问题,我试图研究一下Fork/Join vs ExecutorService.我遇到了一篇完全抨击Fork/Join API /类的文章.学习Fork/Join值得吗?我在stackoverflow和其他地方看到了一些文章,其中测试用于比较Fork/Join和ExecutorService,并且有图表显示了Fork/Join vs ExecutorService的更好的CPU使用率(通过Windows任务管理器).但是,当我使用ExecutorService(JDK 1.7.x)时,我的CPU使用率是最大值.使用最新的JDK改进了ExecutorService吗?
任何帮助/指导表示赞赏.
我需要产生一套期货,等到所有这些期货都失败或取得一些成功.
最近的Scala 2.10没有包含这样的东西,或者我错过了什么?
我想要的是
我想研究fork/join算法的优化.通过优化,我的意思是计算最佳线程数,或者如果你想要 - 计算SEQUENTIAL_THRESHOLD
(参见下面的代码).
// PSEUDOCODE
Result solve(Problem problem) {
if (problem.size < SEQUENTIAL_THRESHOLD)
return solveSequentially(problem);
else {
Result left, right;
INVOKE-IN-PARALLEL {
left = solve(extractLeftHalf(problem));
right = solve(extractRightHalf(problem));
}
return combine(left, right);
}
}
Run Code Online (Sandbox Code Playgroud)
我怎么想象呢
例如,我想计算大数组的乘积.然后我只评估所有组件并获得最佳线程数量:
SEQUENTIAL_THRESHOLD = PC * IS / MC
(只是例子)
PC
- 处理器核心数量;
IS
- 常量,表示具有一个处理器内核的最佳阵列大小和对数据的最简单操作(例如读取);
MC
- 倍增运营成本;
假设MC = 15; PC = 4且IS = 10000; SEQUENTIAL_THRESHOLD = 2667
.如果子任务数组大于2667,我会分叉它.
广泛的问题
狭义的问题:
是否已经存在关于SEQUENTIAL_THRESHOLD
数组/集合/排序计算的一些调查?他们如何实现这一目标?
2014年3月7日更新:
关注如何在线程池中使用MDC?如何使用MDC ForkJoinPool
?具体来说,我怎么能ForkJoinTask
在执行任务之前设置一个这样的MDC值?
是否可以配置ForkJoinPool
为使用1个执行线程?
我正在执行Random
在一个内部调用的代码ForkJoinPool
.每次运行时,我都会遇到不同的运行时行为,因此很难调查回归.
我希望代码库提供"调试"和"发布"模式."debug"模式将Random
使用固定种子配置,并ForkJoinPool
使用单个执行线程."release"模式将使用系统提供的Random
种子并使用默认的ForkJoinPool
线程数.
我尝试ForkJoinPool
使用1的并行性配置,但它使用2个线程(main
和第二个工作线程).有任何想法吗?
我想在Java 8中试用ForkJoinPool,所以我编写了一个小程序,用于搜索名称中包含给定目录中特定关键字的所有文件.
计划:
public class DirectoryService {
public static void main(String[] args) {
FileSearchRecursiveTask task = new FileSearchRecursiveTask("./DIR");
ForkJoinPool pool = (ForkJoinPool) Executors.newWorkStealingPool();
List<String> files = pool.invoke(task);
pool.shutdown();
System.out.println("Total no of files with hello" + files.size());
}
}
class FileSearchRecursiveTask extends RecursiveTask<List<String>> {
private String path;
public FileSearchRecursiveTask(String path) {
this.path = path;
}
@Override
protected List<String> compute() {
File mainDirectory = new File(path);
List<String> filetedFileList = new ArrayList<>();
List<FileSearchRecursiveTask> recursiveTasks = new ArrayList<>();
if(mainDirectory.isDirectory()) {
System.out.println(Thread.currentThread() + " …
Run Code Online (Sandbox Code Playgroud) fork-join ×10
java ×9
concurrency ×4
java-8 ×3
forkjoinpool ×2
future ×1
java-stream ×1
mdc ×1
scala ×1
slf4j ×1