标签: fork-join

在Java 7+ ForkJoinPool中,是否可以取消任务和所有子任务?

我的程序通过分而治之的方法搜索问题的解决方案(任何解决方案),使用递归和实现RecursiveTasks:我为该部门的第一个分支派一个任务,然后递归到第二个分支:如果第二个分支找到了解决方案,然后我取消了第一个分支,否则我等待它的结果.

这可能不是最佳的.如果找到解决方案,一种方法是针对任何已启动的任务抛出异常.但是,我将如何取消所有已启动的任务?取消任务是否也取消所有子任务?

java java.util.concurrent fork-join

8
推荐指数
1
解决办法
2694
查看次数

为什么parallelStream不使用整个可用的并行性?

我有一个自定义ForkJoinPool创建的并行度为25.

customForkJoinPool = new ForkJoinPool(25);
Run Code Online (Sandbox Code Playgroud)

我有一个包含700个文件名的列表,我使用这样的代码从S3并行下载文件并将它们转换为Java对象:

customForkJoinPool.submit(() -> {
   return fileNames
     .parallelStream()
     .map((fileName) -> {
        Logger log = Logger.getLogger("ForkJoinTest");
        long startTime = System.currentTimeMillis();
        log.info("Starting job at Thread:" + Thread.currentThread().getName());
        MyObject obj = readObjectFromS3(fileName);
        long endTime = System.currentTimeMillis();
        log.info("completed a job with Latency:" + (endTime - startTime));
        return obj;
     })
     .collect(Collectors.toList);
   });
});
Run Code Online (Sandbox Code Playgroud)

当我查看日志时,我看到只使用了5个线程.平行度为25,我预计这将使用25个线程.下载文件并将文件转换为对象的平均延迟时间约为200毫秒.我错过了什么?

可能更好的问题是并行流如何在为其创建线程之前分析原始列表的分割量?在这种情况下,看起来它决定将它拆分5次并停止.

java multithreading fork-join java-8 java-stream

8
推荐指数
2
解决办法
292
查看次数

即将到来的fork-join框架上的资源

我正在寻找组织良好的信息来源,关于如何使用即将推出的jsr166y(fork-join,fences)和extras166y(ParallelArray等) - 从教程到专家级别.

java concurrency fork-join

7
推荐指数
1
解决办法
1338
查看次数

Java fork连接算法分析

我正在研究Java 7中新的Fork-Join框架(作为课程要求的一部分),并分析与传统线程机制相比的性能改进.使用新的fork join框架可以保证更快运行的分而治之算法有哪些.你能否提出我可以用来分析性能差异的任何非平凡算法?

java concurrency fork-join

7
推荐指数
1
解决办法
955
查看次数

在java中调用ExecutorService.shutDown()

我开始学习ExecutorService类.文档(和在线教程)说总是调用ExecutorService.shutDown()来回收资源.但是,文档还说在调用shutDown()之后,不会接受任何新任务.所以,我的问题是,每当我需要并行化数据处理时,我是否始终必须实例化一个新的ExecutorService?

现在我有一个可调用对象列表,我执行以下操作.

public void someMethod() {
 List<OuterCallable> outerCallables = getOuterCallables();
 ExecutorService executor = Executor.newFixedThreadPool(NUM_CPUS);
 executor.invokeAll(tasks);
 executor.shutDown();
}
Run Code Online (Sandbox Code Playgroud)

但是,我的OuterCallable还使用InnerCallable并行分割数据或执行数据处理.

public class OuterCallable implements Callable<Long> {
 public Long call() throws Exception {
  long result = 0L;

  List<InnerCallable> innerCallables = getInnerCallables();
  ExecutorServices executor = Executor.newFixedThreadPool(NUM_CPUS);
  executor.invokeAll(tasks);
  executor.shutDown();

  return result;
 }
}    
Run Code Online (Sandbox Code Playgroud)

我不记得它是用于ExecutorService还是Fork/Join方法,但我记得文档和教程说操作数据的实际并行过程不应该涉及I/O操作,一切都应该在内存中完成.但是,在我的InnerCallable中,我实际上正在进行JDBC调用(此处未显示).

最终,我使用ExecutorService的方式有效,但我仍然有一些挥之不去的担忧.

  1. 我的方法是使用ExecutorService进行良好的编程实践吗?
  2. 我应该使用ExecutorService的单例实例吗?
  3. 我不仅应该避免并行方法中的I/O操作,还要避免JDBC调用吗?

作为最后一个问题,我试图研究一下Fork/Join vs ExecutorService.我遇到了一篇完全抨击Fork/Join API /类的文章.学习Fork/Join值得吗?我在stackoverflow和其他地方看到了一些文章,其中测试用于比较Fork/Join和ExecutorService,并且有图表显示了Fork/Join vs ExecutorService的更好的CPU使用率(通过Windows任务管理器).但是,当我使用ExecutorService(JDK 1.7.x)时,我的CPU使用率是最大值.使用最新的JDK改进了ExecutorService吗?

任何帮助/指导表示赞赏.

java multithreading executorservice fork-join

7
推荐指数
1
解决办法
2978
查看次数

Futures.await在Scala 2.10中全部替换

我需要产生一套期货,等到所有这些期货都失败或取得一些成功.

最近的Scala 2.10没有包含这样的东西,或者我错过了什么?

concurrency scala future fork-join

7
推荐指数
1
解决办法
827
查看次数

Fork加入优化

我想要的是

我想研究fork/join算法的优化.通过优化,我的意思是计算最佳线程数,或者如果你想要 - 计算SEQUENTIAL_THRESHOLD(参见下面的代码).

// PSEUDOCODE
Result solve(Problem problem) { 
    if (problem.size < SEQUENTIAL_THRESHOLD)
        return solveSequentially(problem);
    else {
        Result left, right;
        INVOKE-IN-PARALLEL { 
            left = solve(extractLeftHalf(problem));
            right = solve(extractRightHalf(problem));
        }
        return combine(left, right);
    }
}
Run Code Online (Sandbox Code Playgroud)

我怎么想象呢

例如,我想计算大数组的乘积.然后我只评估所有组件并获得最佳线程数量:

SEQUENTIAL_THRESHOLD = PC * IS / MC (只是例子)

PC - 处理器核心数量; IS - 常量,表示具有一个处理器内核的最佳阵列大小和对数据的最简单操作(例如读取); MC - 倍增运营成本;

假设MC = 15; PC = 4且IS = 10000; SEQUENTIAL_THRESHOLD = 2667.如果子任务数组大于2667,我会分叉它.

广泛的问题

  1. 是否有可能以这种方式制作SEQUENTIAL_THRESHOLD公式?
  2. 是否有可能为更复杂的计算完成相同的工作:不仅是对数组/集合和排序的操作?

狭义的问题:

是否已经存在关于SEQUENTIAL_THRESHOLD数组/集合/排序计算的一些调查?他们如何实现这一目标?

2014年3月7日更新:

  1. 如果无法为阈值计算编写单个公式,我可以编写一个将在PC上执行预定义测试的util,并获得最佳阈值吗?这也不可能吗?
  2. Java 8 Streams …

java concurrency multithreading fork-join java-8

7
推荐指数
2
解决办法
1126
查看次数

如何在ForkJoinPool中使用MDC?

关注如何在线程池中使用MDC?如何使用MDC ForkJoinPool?具体来说,我怎么能ForkJoinTask在执行任务之前设置一个这样的MDC值?

java slf4j mdc fork-join

7
推荐指数
1
解决办法
2108
查看次数

如何配置单线程ForkJoinPool?

是否可以配置ForkJoinPool为使用1个执行线程?

我正在执行Random在一个内部调用的代码ForkJoinPool.每次运行时,我都会遇到不同的运行时行为,因此很难调查回归.

我希望代码库提供"调试"和"发布"模式."debug"模式将Random使用固定种子配置,并ForkJoinPool使用单个执行线程."release"模式将使用系统提供的Random种子并使用默认的ForkJoinPool线程数.

我尝试ForkJoinPool使用1的并行性配置,但它使用2个线程(main和第二个工作线程).有任何想法吗?

java multithreading fork-join forkjoinpool

6
推荐指数
1
解决办法
1333
查看次数

ForkJoinPool - 为什么程序抛出OutOfMemoryError?

我想在Java 8中试用ForkJoinPool,所以我编写了一个小程序,用于搜索名称中包含给定目录中特定关键字的所有文件.

计划:

public class DirectoryService {

    public static void main(String[] args) {
        FileSearchRecursiveTask task = new FileSearchRecursiveTask("./DIR");
        ForkJoinPool pool = (ForkJoinPool) Executors.newWorkStealingPool();
        List<String> files = pool.invoke(task);
        pool.shutdown();
        System.out.println("Total  no of files with hello" + files.size());
    }

}

    class FileSearchRecursiveTask extends RecursiveTask<List<String>> {
        private String path;
        public FileSearchRecursiveTask(String path) {
            this.path = path;
        }

        @Override
        protected List<String> compute() {
            File mainDirectory = new File(path);
            List<String> filetedFileList = new ArrayList<>();
            List<FileSearchRecursiveTask> recursiveTasks = new ArrayList<>();
            if(mainDirectory.isDirectory()) {
                System.out.println(Thread.currentThread() + " …
Run Code Online (Sandbox Code Playgroud)

java multithreading fork-join java-8 forkjoinpool

6
推荐指数
1
解决办法
846
查看次数