Joblib并行多个cpu比单个慢

mha*_*ger 24 python parallel-processing

我刚刚开始使用Joblib模块,我正在尝试理解Parallel函数的工作原理.下面是并行化导致运行时间更长的示例,但我不明白为什么.我在1 cpu上的运行时间为51秒,而在2 cpu上为217秒.

我的假设是并行运行循环会将列表a和b复制到每个处理器.然后将item_n分配给一个cpu,将item_n + 1分配给另一个cpu,执行该函数,然后将结果写回列表(按顺序).然后抓住接下来的两个项目,依此类推.我显然错过了一些东西.

这是一个糟糕的例子还是使用joblib?我只是简单地构造了错误的代码吗?

这是一个例子:

import numpy as np
from matplotlib.path import Path
from joblib import Parallel, delayed

## Create pairs of points for line segments
a = zip(np.random.rand(5000,2),np.random.rand(5000,2))

b = zip(np.random.rand(300,2),np.random.rand(300,2))

## Check if one line segment contains another. 
def check_paths(path, paths):
    for other_path in paths:
        res='no cross'
        chck = Path(other_path)
        if chck.contains_path(path)==1:
            res= 'cross'
            break
    return res

res = Parallel(n_jobs=2) (delayed(check_paths) (Path(points), a) for points in b)
Run Code Online (Sandbox Code Playgroud)

Nab*_*bla 36

简而言之:我无法重现你的问题.如果你在Windows上,你应该为你的主循环使用一个保护器:文档joblib.Parallel.我看到的唯一问题是大量的数据复制开销,但你的数字似乎是不切实际的.

总之,以下是我的代码时间:

在我的i7 3770k(4核,8个线程)上,我得到以下不同的结果n_jobs:

For-loop: Finished in 33.8521318436 sec
n_jobs=1: Finished in 33.5527760983 sec
n_jobs=2: Finished in 18.9543449879 sec
n_jobs=3: Finished in 13.4856410027 sec
n_jobs=4: Finished in 15.0832719803 sec
n_jobs=5: Finished in 14.7227740288 sec
n_jobs=6: Finished in 15.6106669903 sec
Run Code Online (Sandbox Code Playgroud)

因此,使用多个流程会有所收获.然而,尽管我有四个核心,但增益已经在三个过程中饱和.所以我猜测执行时间实际上受内存访问而不是处理器时间的限制.

您应该注意到每个循环条目的参数都会复制到执行它的进程中.这意味着您要复制a每个元素b.那是无效的.所以改为访问全局a.(Parallel将分叉进程,将所有全局变量复制到新生成的进程,因此a可以访问).这给了我以下代码(带有时序和主循环保护作为joblib建议的文档:

import numpy as np
from matplotlib.path import Path
from joblib import Parallel, delayed
import time
import sys

## Check if one line segment contains another. 

def check_paths(path):
    for other_path in a:
        res='no cross'
        chck = Path(other_path)
        if chck.contains_path(path)==1:
            res= 'cross'
            break
    return res

if __name__ == '__main__':
    ## Create pairs of points for line segments
    a = zip(np.random.rand(5000,2),np.random.rand(5000,2))
    b = zip(np.random.rand(300,2),np.random.rand(300,2))

    now = time.time()
    if len(sys.argv) >= 2:
        res = Parallel(n_jobs=int(sys.argv[1])) (delayed(check_paths) (Path(points)) for points in b)
    else:
        res = [check_paths(Path(points)) for points in b]
    print "Finished in", time.time()-now , "sec"
Run Code Online (Sandbox Code Playgroud)

时间结果:

 n_jobs=1: Finished in 34.2845709324 sec
 n_jobs=2: Finished in 16.6254048347 sec
 n_jobs=3: Finished in 11.219119072 sec
 n_jobs=4: Finished in 8.61683392525 sec
 n_jobs=5: Finished in 8.51907801628 sec
 n_jobs=6: Finished in 8.21842098236 sec
 n_jobs=7: Finished in 8.21816396713 sec
 n_jobs=8: Finished in 7.81841087341 sec
Run Code Online (Sandbox Code Playgroud)

饱和度现在略微移动到n_jobs=4预期的值.

check_paths做了几个可以轻松消除的冗余计算.首先,在每次调用中执行other_paths=a该行中的所有元素Path(...).预先准确的.其次,字符串res='no cross'写入是每个循环转,虽然它可能只改变一次(然后是中断和返回).移动循环前面的线.然后代码看起来像这样:

import numpy as np
from matplotlib.path import Path
from joblib import Parallel, delayed
import time
import sys

## Check if one line segment contains another. 

def check_paths(path):
    #global a
    #print(path, a[:10])
    res='no cross'
    for other_path in a:
        if other_path.contains_path(path)==1:
            res= 'cross'
            break
    return res

if __name__ == '__main__':
    ## Create pairs of points for line segments
    a = zip(np.random.rand(5000,2),np.random.rand(5000,2))
    a = [Path(x) for x in a]

    b = zip(np.random.rand(300,2),np.random.rand(300,2))

    now = time.time()
    if len(sys.argv) >= 2:
        res = Parallel(n_jobs=int(sys.argv[1])) (delayed(check_paths) (Path(points)) for points in b)
    else:
        res = [check_paths(Path(points)) for points in b]
    print "Finished in", time.time()-now , "sec"
Run Code Online (Sandbox Code Playgroud)

与时间:

n_jobs=1: Finished in 5.33742594719 sec
n_jobs=2: Finished in 2.70858597755 sec
n_jobs=3: Finished in 1.80810618401 sec
n_jobs=4: Finished in 1.40814709663 sec
n_jobs=5: Finished in 1.50854086876 sec
n_jobs=6: Finished in 1.50901818275 sec
n_jobs=7: Finished in 1.51030707359 sec
n_jobs=8: Finished in 1.51062297821 sec
Run Code Online (Sandbox Code Playgroud)

代码上的一个旁边节点,虽然我没有真正遵循它的目的,因为这与你的问题无关,但contains_path只会返回True if this path completely contains the given path.(参见文档).因此,no cross给定随机输入,您的函数基本上总是返回.


Gae*_*aux 18

除了上面的答案,以及将来的参考,这个问题还有两个方面,而joblib最近的演变对两者都有帮助.

并行池创建开销:此处的问题是创建并行池的成本很高.这里的成本特别高,因为在创建Parallel对象时,每个作业都运行不受" main " 保护的代码.在最新的joblib(仍然是beta)中,Parallel可以用作上下文管理器来限制池的创建时间,从而限制此开销的影响.

调度开销:重要的是要记住调度for循环的项目有一个开销(比没有并行的迭代for循环大得多).因此,如果这些单独的计算项非常快,则这种开销将主导计算.在最新的joblib中,joblib将跟踪每个作业的执行时间,并在它们非常快的时候开始聚集它们.在大多数情况下,这极大地限制了调度开销的影响(参见PR的替补和讨论).


免责声明:我是joblib的原作者(只是说在我的回答中警告可能存在的利益冲突,尽管在这里我认为这是无关紧要的).