Java ForkJoinPool - 队列中的任务顺序

Ken*_*ong 9 java multithreading forkjoinpool

我想了解Java fork-join池中处理任务的顺序.

到目前为止,我在文档中找到的唯一相关信息是关于一个名为"asyncMode"的参数,如果此池对从未加入的分叉任务使用本地先进先出调度模式,则为"true" .

我对这个陈述的解释是每个工人都有自己的任务队列; 工人从他们自己的队列前面接受任务,或者如果他们自己的队列是空的,就偷走其他工人队列的后面; 如果asyncMode为true(resp.false),worker会将新分叉的任务添加到自己队列的后面(resp.front).

如果我的解释错误,请纠正我!

现在,这提出了几个问题:

1)加入的分叉任务的顺序什么?

我的猜测是,当一个任务被分叉时,它会被添加到工作者的队列中,如上面的解释所述.现在,假设任务已加入......

  • 如果在调用join时,任务尚未启动,则工作者调用join将把任务拉出队列并立即开始处理它.

  • 如果在调用join时,该任务已经被另一个worker偷走了,那么调用join的worker将同时处理其他任务(按照我上面的解释中描述的获取任务的顺序),直到它的任务为止.加入已被偷走的工人完成了.

这种猜测基于使用print语句编写简单的测试代码,并观察改变连接调用顺序的方式会影响处理任务的顺序.有人可以告诉我,我的猜测是否正确?

2)外部提交的任务的顺序是什么?

根据此问题的答案,fork-join池不使用外部队列.(顺便说一下,我正在使用Java 8.)

所以我要理解,当外部提交任务时,任务会被添加到随机选择的工作队列中吗?

如果是这样,外部提交的任务是否添加到队列的后面或前面?

最后,这取决于是通过调用pool.execute(任务)还是通过调用pool.invoke(task)来提交任务?这取决于调用pool.execute(task)或pool.invoke(task)的线程是外部线程还是此fork-join池中的线程?

Gal*_*l S 3

  1. 你的猜测是正确的,你完全正确。正如您可以在“实施概述”中阅读的那样。
 * Joining Tasks
 * =============
 *
 * Any of several actions may be taken when one worker is waiting
 * to join a task stolen (or always held) by another.  Because we
 * are multiplexing many tasks on to a pool of workers, we can't
 * just let them block (as in Thread.join).  We also cannot just
 * reassign the joiner's run-time stack with another and replace
 * it later, which would be a form of "continuation", that even if
 * possible is not necessarily a good idea since we may need both
 * an unblocked task and its continuation to progress.  Instead we
 * combine two tactics:
 *
 *   Helping: Arranging for the joiner to execute some task that it
 *      would be running if the steal had not occurred.
 *
 *   Compensating: Unless there are already enough live threads,
 *      method tryCompensate() may create or re-activate a spare
 *      thread to compensate for blocked joiners until they unblock.
Run Code Online (Sandbox Code Playgroud)

2.ForkJoinPool.invoke和ForkJoinPool.join的任务提交方式是完全一样的。代码中可以看到

    public <T> T invoke(ForkJoinTask<T> task) {
        if (task == null)
            throw new NullPointerException();
        externalPush(task);
        return task.join();
    }
    public void execute(ForkJoinTask<?> task) {
        if (task == null)
            throw new NullPointerException();
        externalPush(task);
    }
Run Code Online (Sandbox Code Playgroud)

在externalPush中,您可以看到任务被添加到使用ThreadLocalRandom随机选择的工作队列中。而且,它是通过push方式进入队列头部的。

    final void externalPush(ForkJoinTask<?> task) {
        WorkQueue[] ws; WorkQueue q; int m;
        int r = ThreadLocalRandom.getProbe();
        int rs = runState;
        if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
            (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&
            U.compareAndSwapInt(q, QLOCK, 0, 1)) {
            ForkJoinTask<?>[] a; int am, n, s;
            if ((a = q.array) != null &&
                (am = a.length - 1) > (n = (s = q.top) - q.base)) {
                    int j = ((am & s) << ASHIFT) + ABASE;
                U.putOrderedObject(a, j, task);
                U.putOrderedInt(q, QTOP, s + 1);
                U.putIntVolatile(q, QLOCK, 0);
                if (n <= 1)
                    signalWork(ws, q);
                return;
            }
            U.compareAndSwapInt(q, QLOCK, 1, 0);
        }
        externalSubmit(task);
    }
Run Code Online (Sandbox Code Playgroud)

我不确定你的意思是什么:

这是否取决于调用 pool.execute(task) 或 pool.invoke(task) 的线程是外部线程还是此 fork-join 池中的线程?