标签: fork-join

Fork-join中的内存可见性

Brian Goetz在http://www.ibm.com/developerworks/java/library/j-jtp03048.html上写了一篇关于fork-join的好文章.在其中,他列出了使用fork-join机制的合并排序算法,在该机制中,他并行执行数组两侧的排序,然后合并结果.

该算法同时对同一阵列的两个不同部分进行排序.为什么不是AtomicIntegerArray或维持可见性所需的其他机制?什么保证一个线程会看到另一个线程完成的写入,或者这是一个微妙的错误?作为后续跟进,Scala的ForkJoinScheduler是否也提供此保证?

谢谢!

java scala fork-join jsr166

6
推荐指数
1
解决办法
1208
查看次数

ForkJoinPool似乎浪费了一个线程

我正在比较测试程序的两个变体.两者都ForkJoinPool在具有四个内核的机器上使用4线程运行.

在'模式1'中,我非常像执行器服务使用池.我把一堆任务扔进去ExecutorService.invokeAll.我获得了比普通的固定线程执行器服务更好的性能(即使有调用Lucene,那里也有一些I/O).

这里没有分而治之的.从字面上看,我做到了

ExecutorService es = new ForkJoinPool(4);
es.invokeAll(collection_of_Callables);
Run Code Online (Sandbox Code Playgroud)

在'模式2'中,我向池中提交单个任务,并在该任务中调用ForkJoinTask.invokeAll来提交子任务.所以,我有一个继承自的对象,RecursiveAction它被提交到池中.在该类的compute方法中,我调用了invokeAll来自不同类的对象集合,这些对象也继承自RecursiveAction.出于测试目的,我只提交第一个对象的一次一个.我天真地期望看到所有四个线程忙什么,因为线程调用invokeAll将为自己抓取一个子任务而不是仅仅坐着和阻塞.我可以想到为什么它可能不会那样工作的一些原因.

在VisualVM中观察,在模式2中,一个线程几乎总是在等待.我希望看到的是调用invokeAll的线程立即开始处理其中一个被调用的任务,而不仅仅是静坐.这肯定比使用普通线程池尝试此方案所导致的死锁更好,但仍然是什么?它是否保留一个线程,以防其他东西被提交?而且,如果是这样,为什么模式1中的问题不同?

到目前为止,我一直使用添加到java 1.6的引导类路径的jsr166 jar来运行它.

java fork-join java-7

6
推荐指数
1
解决办法
3046
查看次数

ForkJoinPool在invokeAll/join期间停止

我尝试使用ForkJoinPool 来并行化我的CPU密集型计算.我对ForkJoinPool的理解是,只要任何任务可以执行,它就会继续工作.不幸的是,我经常观察工作线程空闲/等待,因此并非所有CPU都保持忙碌状态.有时我甚至观察到额外的工作线程.

我没想到这一点,因为我严格尝试使用非阻塞任务.我的观察非常类似于ForkJoinPool似乎浪费了一个线程.在对ForkJoinPool进行了大量调试之后我猜了一下:

我使用invokeAll()在子任务列表上分配工作.在invokeAll()完成后执行第一个任务本身,它开始加入其他任务.这很好,直到下一个要连接的任务位于执行队列之上.不幸的是,我提交了异步的其他任务而没有加入它们.我期望ForkJoin框架首先继续执行这些任务,然后再转回加入任何剩余的任务.

但它似乎不是这样工作的.相反,工作线程停止调用wait()直到等待的任务准备好(可能是由其他工作线程执行).我没有验证这一点,但似乎是调用join()的一般缺陷.

ForkJoinPool提供了一个asyncMode,但这是一个全局参数,不能用于单个提交.但我喜欢看到我的异步分叉任务很快就会被执行.

那么,为什么ForkJoinTask.doJoin()不是简单地在其队列之上执行任何可用任务,直到它准备好(由自己执行或被其他人窃取)?

java lock-free java.util.concurrent fork-join

6
推荐指数
1
解决办法
3072
查看次数

Iterative Fork-Join用于分治的基本情况

我有一个递归的分治算法,在开始分割之前需要两个计算密集的基本案例任务.最初的基本案例是独立的任务,所以我想并行完成.在基本情况之后,除法运行相同的任务,在0和1之间输入不同的输入,并根据输出决定是否再次拆分.我通过创建一个伪造递归的任务包装器对象来使基本案例工作,但这感觉就像一个kludge,如下所示:

public static void doSomething () {
    ForkJoinPool pool = new ForkJoinPool();
    private ArrayList<Object> al = new ArrayList<Object>();
    TaskWrapper tw = new TaskWrapper(true,-1);

    al.addAll(pool.invoke(tw));
}

@SuppressWarnings("serial")
public static class TaskWrapper extends RecursiveTask<ArrayList<Object>> {
    private ArrayList<Object> al = new ArrayList<Object>();
    private boolean arg;
    private double input;
    private Object out;

    TaskWrapper(boolean ar, double in){
        arg = ar;
        input = in;
    }

    @Override
    public ArrayList<Object> compute() {
        if (arg == false) {
            out = new Object(runIntensiveTask(input));
            al.add(out);
        }
        else {
            // Right Base …
Run Code Online (Sandbox Code Playgroud)

java multithreading java.util.concurrent fork-join forkjoinpool

6
推荐指数
1
解决办法
315
查看次数

现在的工人是否参与偷工作?

在a中ForkJoinPool ForkJoinTask,当前工作线程是否参与工作窃取?

我已经读到了fork连接池可以从阻塞或等待线程中窃取的含义.目前的工人似乎是一个明显的候选人 一旦工作人员调用.join()另一个任务,那么该任务基本上被阻止.

另一方面,我看到许多文章暗示了不同的结论.例如,当前工作者线程在等待分叉任务之前应该工作的普遍共识.

有迹象表明,讨论使用的几篇文章ForkJoinTask.getSurplusQueuedTaskCount,通过具有当前工人平衡在队列中的工作方法做了一些工作.如果当前工人也在偷窃,那么这似乎没有必要.

当然,我希望最大化线程操作并使所有工作者最大限度地运行.了解当前线程是否也窃取工作(例如何时.join被调用)将有助于澄清.

java multithreading fork-join

6
推荐指数
1
解决办法
99
查看次数

如何配置单线程ForkJoinPool?

是否可以配置ForkJoinPool为使用1个执行线程?

我正在执行Random在一个内部调用的代码ForkJoinPool.每次运行时,我都会遇到不同的运行时行为,因此很难调查回归.

我希望代码库提供"调试"和"发布"模式."debug"模式将Random使用固定种子配置,并ForkJoinPool使用单个执行线程."release"模式将使用系统提供的Random种子并使用默认的ForkJoinPool线程数.

我尝试ForkJoinPool使用1的并行性配置,但它使用2个线程(main和第二个工作线程).有任何想法吗?

java multithreading fork-join forkjoinpool

6
推荐指数
1
解决办法
1333
查看次数

顺序订阅一系列可观察对象

在这里,我使用forkJoinrxjs来并行地订阅一个可观察对象数组。但是我要一个一个地订阅,什么是最好的解决方案?

下面是我的代码:

var observables = [];

Observable.forkJoin(observables)
    .subscribe(() => {
        this.msgs = [];
        this.msgs.push({
            severity: 'success',
            summary: 'Saved Successfully'
        });
        this.onSaveComplete();
    }, (error: any) => this.errorMessage = <any>error);
}, (error: any) => this.errorMessage = <any>error);
Run Code Online (Sandbox Code Playgroud)

fork-join observable rxjs angular

6
推荐指数
1
解决办法
2907
查看次数

使用fork/join可以跨线程边界安全地移植非线程安全值吗?

我有一些不是线程安全的类:

class ThreadUnsafeClass {
  long i;

  long incrementAndGet() { return ++i; }
}
Run Code Online (Sandbox Code Playgroud)

(我在long这里使用了a 作为字段,但我们应该将其字段视为一些线程不安全的类型).

我现在有一个看起来像这样的课程

class Foo {
  final ThreadUnsafeClass c;

  Foo(ThreadUnsafeClass c) {
    this.c = c;
  }
}
Run Code Online (Sandbox Code Playgroud)

也就是说,线程不安全类是它的最后一个字段.现在我要这样做:

public class JavaMM {
  public static void main(String[] args) {
    final ForkJoinTask<ThreadUnsafeClass> work = ForkJoinTask.adapt(() -> {
      ThreadUnsafeClass t = new ThreadUnsafeClass();
      t.incrementAndGet();
      return new FC(t);
    });

    assert (work.fork().join().c.i == 1); 
  }
}
Run Code Online (Sandbox Code Playgroud)

也就是说,从thread T(main),我调用了一些工作T'(fork-join-pool),它创建并改变了我的不安全类的实例,然后返回包含在a中的结果Foo.请注意,我的线程不安全类的所有变异都发生在一个线程上T'.

问题1:我是否保证thread-unsafe-class实例的结束状态安全地移植到?的 …

java parallel-processing fork-join java-memory-model java-stream

6
推荐指数
1
解决办法
237
查看次数

ForkJoinPool - 为什么程序抛出OutOfMemoryError?

我想在Java 8中试用ForkJoinPool,所以我编写了一个小程序,用于搜索名称中包含给定目录中特定关键字的所有文件.

计划:

public class DirectoryService {

    public static void main(String[] args) {
        FileSearchRecursiveTask task = new FileSearchRecursiveTask("./DIR");
        ForkJoinPool pool = (ForkJoinPool) Executors.newWorkStealingPool();
        List<String> files = pool.invoke(task);
        pool.shutdown();
        System.out.println("Total  no of files with hello" + files.size());
    }

}

    class FileSearchRecursiveTask extends RecursiveTask<List<String>> {
        private String path;
        public FileSearchRecursiveTask(String path) {
            this.path = path;
        }

        @Override
        protected List<String> compute() {
            File mainDirectory = new File(path);
            List<String> filetedFileList = new ArrayList<>();
            List<FileSearchRecursiveTask> recursiveTasks = new ArrayList<>();
            if(mainDirectory.isDirectory()) {
                System.out.println(Thread.currentThread() + " …
Run Code Online (Sandbox Code Playgroud)

java multithreading fork-join java-8 forkjoinpool

6
推荐指数
1
解决办法
846
查看次数

为什么ForkJoinPool :: invoke()会阻塞主线程?

免责声明:这是我第一次使用Java的Fork-Join框架,所以我并不是100%确定我正确使用它.Java也不是我的主要编程语言,所以这也可能是相关的.


鉴于以下SSCCE:

import java.util.Arrays;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveAction;

class ForkCalculator extends RecursiveAction
{
    private final Integer[] delayTasks;

    public ForkCalculator(Integer[] delayTasks)
    {
        this.delayTasks = delayTasks;
    }

    @Override
    protected void compute()
    {
        if (this.delayTasks.length == 1) {
            this.computeDirectly();
            return;
        }

        Integer halfway = this.delayTasks.length / 2;

        ForkJoinTask.invokeAll(
            new ForkCalculator(
                Arrays.copyOfRange(this.delayTasks, 0, halfway)
            ),
            new ForkCalculator(
                Arrays.copyOfRange(this.delayTasks, halfway, this.delayTasks.length)
            )
        );
    }

    private void computeDirectly()
    {
        Integer delayTask = this.delayTasks[0];

        try {
            Thread.sleep(delayTask);
        } catch …
Run Code Online (Sandbox Code Playgroud)

java fork-join java-8 forkjoinpool

6
推荐指数
1
解决办法
545
查看次数