fork/join框架如何比线程池更好?

Joo*_*kka 128 java fork-join

使用新的fork/join框架而不仅仅是在开始时将大任务分成N个子任务,将它们发送到缓存的线程池(来自Executors)并等待每个任务完成,有什么好处?我没有看到使用fork/join抽象如何简化问题或使解决方案从我们多年来的工作中提高效率.

例如,教程示例中的并行化模糊算法可以像这样实现:

public class Blur implements Runnable {
    private int[] mSource;
    private int mStart;
    private int mLength;
    private int[] mDestination;

    private int mBlurWidth = 15; // Processing window size, should be odd.

    public ForkBlur(int[] src, int start, int length, int[] dst) {
        mSource = src;
        mStart = start;
        mLength = length;
        mDestination = dst;
    }

    public void run() {
        computeDirectly();
    }

    protected void computeDirectly() {
        // As in the example, omitted for brevity
    }
}
Run Code Online (Sandbox Code Playgroud)

在开头拆分并将任务​​发送到线程池:

// source image pixels are in src
// destination image pixels are in dst
// threadPool is a (cached) thread pool

int maxSize = 100000; // analogous to F-J's "sThreshold"
List<Future> futures = new ArrayList<Future>();

// Send stuff to thread pool:
for (int i = 0; i < src.length; i+= maxSize) {
    int size = Math.min(maxSize, src.length - i);
    ForkBlur task = new ForkBlur(src, i, size, dst);
    Future f = threadPool.submit(task);
    futures.add(f);
}

// Wait for all sent tasks to complete:
for (Future future : futures) {
    future.get();
}

// Done!
Run Code Online (Sandbox Code Playgroud)

任务转到线程池的队列,当工作线程可用时,它们将从该队列执行.只要拆分足够精细(以避免必须特别等待最后一个任务)并且线程池有足够的(至少N个处理器)线程,所有处理器都在全速工作,直到整个计算完成.

我错过了什么吗?使用fork/join框架的附加价值是什么?

A.H*_*.H. 131

我认为基本的误解是,Fork/Join示例显示工作窃取,而只是某种标准的分而治之.

工作窃取将是这样的:工人B已完成他的工作.他是一个善良的人,所以他环顾四周,看到工人A仍然非常努力.他漫步并问道:"嘿小伙子,我可以帮你一把." 回复."很酷,我有1000个单位的任务.到目前为止,我已经完成了345个离开655.你能不能在673到1000号工作,我会做346到672个." B说"好的,我们先来吧,我们可以早点去酒吧."

你看 - 即使他们开始真正的工作,工人也必须彼此沟通.这是示例中缺少的部分.

另一方面,示例仅显示"使用分包商"之类的内容:

工人A:"Dang,我有1000个单位的工作.对我来说太多了.我自己会做500个并将500个转包给其他人." 这种情况一直持续到大任务被分解为每个10个单元的小包.这些将由可用的工作人员执行.但是,如果一个包是一种毒丸并且需要比其他包更长的时间 - 运气不好,分裂阶段就结束了.

Fork/Join和预先拆分任务之间唯一的区别是:在前期拆分时,您可以从一开始就完整地处理工作队列.示例:1000个单位,阈值为10,因此队列有100个条目.这些数据包被分发给线程池成员.

Fork/Join更复杂,并尝试将队列中的数据包数量保持较小:

  • 步骤1:将包含(1 ... 1000)的一个数据包放入队列
  • 步骤2:一个工作人员弹出数据包(1 ... 1000)并用两个数据包替换它:(1 ... 500)和(501 ... 1000).
  • 步骤3:一名工作人员弹出数据包(500 ... 1000)并推送(500 ... 750)和(751 ... 1000).
  • 步骤n:堆栈包含以下数据包:(1..500),(500 ... 750),(750 ... 875)...(991..1000)
  • 步骤n + 1:弹出并执行包(991..1000)
  • 步骤n + 2:弹出并执行包(981..990)
  • 步骤n + 3:弹出包(961..980)并分成(961 ... 970)和(971..980).....

您会看到:在Fork/Join中,队列较小(示例中为6),并且"split"和"work"阶段是交错的.

当多个工人同时弹出和推动时,互动当然不是那么清楚.

  • Oracle的例子IMO的问题并不是它没有证明工作窃取(它确实如AH所描述的那样),而是很容易为一个简单的ThreadPool编写算法(就像Joonas所做的那样).当工作不能预先分成足够的独立任务时,FJ最有用,但可以递归地分成独立的任务.请参阅我的答案以获取示例 (6认同)
  • 偷工作的一些例子可能派上用场:http://www.h-online.com/developer/features/The-fork-join-framework-in-Java-7-1762357.html (2认同)

Tom*_*ine 25

如果你有n个繁忙线程都是100%独立工作,那么这将比Fork-Join(FJ)池中的n个线程更好.但它从来没有这样做过.

可能无法将问题精确地分成n个相等的部分.即使你这样做,线程调度也是不公平的.你最终会等待最慢的线程.如果你有多个任务,那么它们每个都可以以低于n路的并行性运行(通常效率更高),但是当其他任务完成时,它会向上运行.

那么为什么我们不把问题简化为FJ大小的部分并且有一个线程池工作.典型的FJ使用将问题分解成小块.以随机顺序执行这些操作需要在硬件级别进行大量协调.开销将是一个杀手.在FJ中,任务被放入队列中,线程以后进先出顺序(LIFO /堆栈)读取,并且工作窃取(通常在核心工作中)先进先出(FIFO /"队列").结果是长阵列处理可以在很大程度上顺序完成,即使它被分成很小的块.(同样的情况是,在一次大爆炸中将问题分解成小的均匀大小的块可能并不是一件轻而易举的事.假设在没有平衡的情况下处理某种形式的层次结构.)

结论:FJ允许在不平衡的情况下更有效地使用硬件线程,如果您有多个线程,则总是如此.


yan*_*kee 15

线程池和Fork/Join的最终目标是相似的:两者都希望尽可能利用可用的CPU功率来实现最大吞吐量.最大吞吐量意味着应该在很长一段时间内完成尽可能多的任务.需要做什么?(对于以下内容,我们假设计算任务并不缺乏:100%的CPU利用率总是足够.另外,在超线程的情况下,我使用"CPU"等效于核心或虚拟核心).

  1. 至少需要尽可能多的线程运行,因为有可用的CPU,因为运行较少的线程将使核心未使用.
  2. 最多必须有尽可能多的线程运行,因为有可用的CPU,因为运行更多的线程将为调度程序创建额外的负载,调度程序将CPU分配给不同的线程,这会导致一些CPU时间转到调度程序而不是我们的计算任务.

因此,我们发现,为了获得最大吞吐量,我们需要拥有与CPU完全相同的线程数.在Oracle的模糊示例中,您既可以使用固定大小的线程池,其线程数等于可用CPU的数量,也可以使用线程池.它没有区别,你是对的!

那么什么时候你会遇到线程池的麻烦?也就是说,如果一个线程阻塞,因为你的线程正在等待另一个任务完成.假设以下示例:

class AbcAlgorithm implements Runnable {
    public void run() {
        Future<StepAResult> aFuture = threadPool.submit(new ATask());
        StepBResult bResult = stepB();
        StepAResult aResult = aFuture.get();
        stepC(aResult, bResult);
    }
}
Run Code Online (Sandbox Code Playgroud)

我们在这里看到的是一个由三个步骤A,B和C组成的算法.A和B可以彼此独立地执行,但是步骤C需要步骤A和B的结果.这个算法的作用是将任务A提交给线程池并直接执行任务b.之后,线程也将等待任务A完成并继续执行步骤C.如果A和B同时完成,那么一切都很好.但是,如果A需要比B更长的时间呢?这可能是因为任务A的性质决定了它,但也可能是这种情况,因为任务A的开头没有可用的线程,任务A需要等待.(如果只有一个CPU可用,因此你的线程池只有一个线程,这甚至会导致死锁,但现在除了这一点之外).关键是刚执行任务B 的线程阻塞整个线程.由于我们拥有与CPU相同数量的线程,并且一个线程被阻塞,这意味着一个CPU处于空闲状态.

Fork/Join解决了这个问题:在fork/join框架中,您将编写相同的算法,如下所示:

class AbcAlgorithm implements Runnable {
    public void run() {
        ATask aTask = new ATask());
        aTask.fork();
        StepBResult bResult = stepB();
        StepAResult aResult = aTask.join();
        stepC(aResult, bResult);
    }
}
Run Code Online (Sandbox Code Playgroud)

看起来一样,不是吗?然而,线索是aTask.join 不会阻止的.相反,这里是工作窃取发挥作用的地方:线程将环顾四周过去已经分叉的其他任务,并将继续这些任务.首先,它检查它已经分叉的任务是否已经开始处理.因此,如果A尚未被另一个线程启动,它将执行A next,否则它将检查其他线程的队列并窃取他们的工作.一旦另一个线程的另一个任务完成,它将检查A是否现在完成.如果是以上算法可以调用stepC.否则它将寻找另一个偷窃的任务.因此,即使遇到阻塞操作,fork/join池也可以实现100%的CPU利用率.

但是有一个陷阱:工作窃取只能用于s 的join调用ForkJoinTask.无法对外部阻塞操作(如等待另一个线程或等待I/O操作)执行此操作.那么等待I/O完成是一项常见的任务呢?在这种情况下,如果我们可以向Fork/Join池添加一个额外的线程,一旦阻塞操作完成就会再次停止,这将是第二个最好的事情.而ForkJoinPool实际上可以做到这一点,如果我们使用的是ManagedBlocker秒.

斐波那契

JavaDoc for RecursiveTask中,使用Fork/Join计算Fibonacci数.对于经典的递归解决方案,请参阅:

public static int fib(int n) {
    if (n <= 1) {
        return n;
    }
    return fib(n - 1) + fib(n - 2);
}
Run Code Online (Sandbox Code Playgroud)

正如在JavaDocs中所解释的,这是计算斐波纳契数的一种非常好的转储方法,因为该算法具有O(2 ^ n)复杂度,而更简单的方法是可能的.但是这个算法非常简单易懂,所以我们坚持使用它.让我们假设我们想要使用Fork/Join加快速度.一个天真的实现看起来像这样:

class Fibonacci extends RecursiveTask<Long> {
    private final long n;

    Fibonacci(long n) {
        this.n = n;
    }

    public Long compute() {
        if (n <= 1) {
            return n;
        }
        Fibonacci f1 = new Fibonacci(n - 1);
        f1.fork();
        Fibonacci f2 = new Fibonacci(n - 2);
        return f2.compute() + f1.join();
   }
}
Run Code Online (Sandbox Code Playgroud)

这个任务被拆分的步骤太短,因此会执行得非常糟糕,但你可以看到框架通常如何工作得很好:两个加法可以独立计算,但是我们需要它们两个来构建最终的结果.所以一半是在另一个线程中完成的.在线程池中做同样的事情而不会出现死锁(可能,但不是那么简单).

只是为了完整性:如果您真的想使用这种递归方法计算斐波纳契数,这里是一个优化版本:

class FibonacciBigSubtasks extends RecursiveTask<Long> {
    private final long n;

    FibonacciBigSubtasks(long n) {
        this.n = n;
    }

    public Long compute() {
        return fib(n);
    }

    private long fib(long n) {
        if (n <= 1) {
            return 1;
        }
        if (n > 10 && getSurplusQueuedTaskCount() < 2) {
            final FibonacciBigSubtasks f1 = new FibonacciBigSubtasks(n - 1);
            final FibonacciBigSubtasks f2 = new FibonacciBigSubtasks(n - 2);
            f1.fork();
            return f2.compute() + f1.join();
        } else {
            return fib(n - 1) + fib(n - 2);
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

这使得子任务保持更小,因为它们仅在n > 10 && getSurplusQueuedTaskCount() < 2为true 时被拆分,这意味着对do(n > 10)的方法调用有超过100个,并且没有非常的人工任务已经在等待(getSurplusQueuedTaskCount() < 2).

在我的计算机上(4核(计数超线程时为8核),英特尔(R)核心(TM)i7-2720QM CPU @ 2.20GHz)fib(50)采用经典方法需要64秒,使用Fork/Join方法只需18秒虽然没有理论上可能的那么多,但这是一个显着的增益.

摘要

  • 是的,在您的示例中,Fork/Join没有经典线程池的优势.
  • 在涉及阻塞时,Fork/Join可以显着提高性能
  • Fork/Join可以避免一些死锁问题


Mat*_*ell 14

Fork/join与线程池不同,因为它实现了工作窃取.来自Fork/Join

与任何ExecutorService一样,fork/join框架将任务分配给线程池中的工作线程.fork/join框架是不同的,因为它使用了工作窃取算法.用尽要做的事情的工作线程可以从仍然忙碌的其他线程中窃取任务.

假设你有两个线程,4个任务a,b,c,d分别需要1,1,5和6秒.最初,a和b分配给线程1,c和d分配给线程2.在线程池中,这将花费11秒.使用fork/join,线程1完成并可以从线程2中窃取工作,因此任务d最终将由线程1执行.线程1执行a,b和d,线程2只是c.总时间:8秒,而不是11秒.

编辑:正如Joonas指出的那样,任务不一定预先分配给一个线程.fork/join的想法是线程可以选择将任务拆分为多个子块.所以要重申以上内容:

我们有两个任务(ab)和(cd),分别需要2秒和11秒.线程1开始执行ab并将其分成两个子任务a和b.与线程2类似,它分为两个子任务c&d.当线程1完成a&b时,它可以从线程2中窃取d.

  • 线程池通常是[ThreadPoolExecutor](http://download.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html)实例.在这样的任务中,任务就是*队列*([BlockingQueue](http://download.oracle.com/javase/7/docs/api/java/util/concurrent/BlockingQueue.html)在实践中)线程在完成上一个任务后立即执行任务.据我所知,任务*不会*预先分配给特定的线程.每个线程一次(最多)一个任务. (4认同)
  • AFAIK有一个_one_ Queue for _one_ ThreadPoolExecutor,它反过来控制_several_ Threads.这意味着将任务或Runnables(而不是Threads!)分配给执行程序,任务也不会预先分配给特定的Threads.正如FJ所做的那样.到目前为止,使用FJ没有任何好处. (4认同)
  • @Matthew Farwell:我可能不了解FJ的所有内容,但是如果一个子任务决定执行它的`computeDirectly()`方法,那就再也无法窃取任何东西了.至少在该示例中,整个分裂是先验*完成的. (2认同)

iai*_*ain 13

上面的每个人都是正确的,通过工作窃取可以获得好处,但要扩展其原因.

主要好处是工作线程之间的有效协调.工作必须分开并重新组装,这需要协调.正如您在AH的答案中所见,每个线程都有自己的工作清单.此列表的一个重要属性是它已排序(顶部的大型任务和底部的小型任务).每个线程执行其列表底部的任务,并从其他线程列表的顶部窃取任务.

结果是:

  • 任务列表的头部和尾部可以独立同步,减少列表上的争用.
  • 工作的重要子树被拆分并由同一个线程重新组装,因此这些子树不需要线程间协调.
  • 当一个线程窃取工作时,它会占用一大块,然后将其细分到自己的列表中
  • 钢结构工作意味着螺纹几乎完全被利用,直到工艺结束.

使用线程池的大多数其他分而治之的方案需要更多的线程间通信和协调.


vol*_*ley 12

在此示例中,Fork/Join不添加任何值,因为不需要分叉,并且工作负载在工作线程之间均匀分配.Fork/Join只会增加开销.

这是一篇关于这个主题的好文章.引用:

总的来说,我们可以说ThreadPoolExecutor是工作线程均匀分配工作负载的首选.为了能够保证这一点,您需要准确了解输入数据的外观.相比之下,ForkJoinPool无论输入数据如何都能提供良好的性能,因此是一种更加强大的解决方案.


ash*_*ley 8

另一个重要的区别似乎是,使用FJ,您可以执行多个复杂的"加入"阶段.考虑来自http://faculty.ycp.edu/~dhovemey/spring2011/cs365/lecture/lecture18.html的合并排序,预分割这项工作需要太多的编排.例如,你需要做以下事情:

  • 排序第一季度
  • 排在第二季度
  • 合并前两个季度
  • 排在第三季度
  • 排在第四季度
  • 合并过去的两个季度
  • 合并两半

如何指定在合并之前必须进行排序等.

我一直在研究如何最好地为每个项目列表做某件事.我想我会预先拆分列表并使用标准的ThreadPool.当工作不能被预分割成足够独立的任务时,FJ似乎最有用,但是可以递归地分割成彼此独立的任务(例如,将两半分开是独立的,但是将两个分类的一半合并为一个整齐的整体不是).


小智 6

当您进行昂贵的合并操作时,F/J也具有明显的优势.因为它分裂为树结构,所以只进行log2(n)合并,而不是n合并线性线程分裂.(这确实使理论假设你拥有与线程一样多的处理器,但仍然是一个优势)对于家庭作业,我们必须通过对每个索引处的值求和来合并数千个2D阵列(所有相同的维度).使用fork join和P处理器,当P接近无穷大时,时间接近log2(n).

1 2 3 .. 7 3 1 .... 8 5 4
4 5 6 + 2 4 3 => 6 9 9
7 8 9 .. 1 1 0 .... 8 9 9