使用numpy/scipy最大限度地减少Python multiprocessing.Pool的开销

Han*_*uys 9 python parallel-processing numpy pool multiprocessing

我花了几个小时来尝试并行化我的数字运算代码,但是当我这样做时它只会变慢.不幸的是,当我尝试将其减少到下面的示例时,问题就消失了,我真的不想在这里发布整个程序.所以问题是:在这类程序中我应该避免哪些陷阱?

(注意:Unutbu的答案在底部后跟进.)

以下是情况:

  • 它是关于一个模块,它定义了一个BigData包含大量内部数据的类.在该示例中,存在一个ff插值函数列表; 在实际的程序,还有更多,例如ffA[k],ffB[k],ffC[k].
  • 计算将被归类为"令人尴尬的并行":可以一次在较小的数据块上完成工作.在这个例子中,那是do_chunk().
  • 在我的实际程序中,示例中显示的方法将导致最差的性能:每个块大约1秒(在单个线程中完成的实际计算时间的0.1秒左右).因此,对于n = 50,do_single()将在5秒内do_multi()运行并且将在55秒内运行.
  • 我还尝试通过将xiyi数组切割成连续的块并迭代k每个块中的所有值来分解工作.这工作得更好一点.现在,无论是使用1,2,3或4个线程,总执行时间都没有差别.但当然,我希望看到实际的加速!
  • 这可能是相关的:Multiprocessing.Pool使Numpy矩阵乘法更慢.但是,在程序的其他地方,我使用了一个多处理池进行更加孤立的计算:一个看起来类似的函数(没有绑定到类),def do_chunk(array1, array2, array3)并对该数组进行仅限于numpy的计算.在那里,有显着的速度提升.
  • CPU使用率随着预期的并行进程数量而变化(三个线程的CPU使用率为300%).
#!/usr/bin/python2.7

import numpy as np, time, sys
from multiprocessing import Pool
from scipy.interpolate import RectBivariateSpline

_tm=0
def stopwatch(msg=''):
    tm = time.time()
    global _tm
    if _tm==0: _tm = tm; return
    print("%s: %.2f seconds" % (msg, tm-_tm))
    _tm = tm

class BigData:
    def __init__(self, n):
        z = np.random.uniform(size=n*n*n).reshape((n,n,n))
        self.ff = []
        for i in range(n):
            f = RectBivariateSpline(np.arange(n), np.arange(n), z[i], kx=1, ky=1)
            self.ff.append(f)
        self.n = n

    def do_chunk(self, k, xi, yi):
        s = np.sum(np.exp(self.ff[k].ev(xi, yi)))
        sys.stderr.write(".")
        return s

    def do_multi(self, numproc, xi, yi):
        procs = []
        pool = Pool(numproc)
        stopwatch('Pool setup')
        for k in range(self.n):
            p = pool.apply_async( _do_chunk_wrapper, (self, k, xi, yi))
            procs.append(p)
        stopwatch('Jobs queued (%d processes)' % numproc)
        sum = 0.0
        for k in range(self.n):
            # Edit/bugfix: replaced p.get by procs[k].get
            sum += np.sum(procs[k].get(timeout=30)) # timeout allows ctrl-C interrupt
            if k == 0: stopwatch("\nFirst get() done")
        stopwatch('Jobs done')
        pool.close()
        pool.join()
        return sum

    def do_single(self, xi, yi):
        sum = 0.0
        for k in range(self.n):
            sum += self.do_chunk(k, xi, yi)
        stopwatch('\nAll in single process')
        return sum

def _do_chunk_wrapper(bd, k, xi, yi): # must be outside class for apply_async to chunk
    return bd.do_chunk(k, xi, yi)        

if __name__ == "__main__":
    stopwatch()
    n = 50
    bd = BigData(n)
    m = 1000*1000
    xi, yi = np.random.uniform(0, n, size=m*2).reshape((2,m))
    stopwatch('Initialized')
    bd.do_multi(2, xi, yi)
    bd.do_multi(3, xi, yi)
    bd.do_single(xi, yi)
Run Code Online (Sandbox Code Playgroud)

输出:

Initialized: 0.06 seconds
Pool setup: 0.01 seconds
Jobs queued (2 processes): 0.03 seconds
..
First get() done: 0.34 seconds
................................................Jobs done: 7.89 seconds
Pool setup: 0.05 seconds
Jobs queued (3 processes): 0.03 seconds
..
First get() done: 0.50 seconds
................................................Jobs done: 6.19 seconds
..................................................
All in single process: 11.41 seconds
Run Code Online (Sandbox Code Playgroud)

计时采用Intel Core i3-3227 CPU,具有2个内核,4个线程,运行64位Linux.对于实际程序,多处理版本(池机制,即使只使用一个核心)比单进程版本慢10倍.

跟进

Unutbu的回答让我走上正轨.在实际程序中,self被腌制成需要传递给工作进程的37到140 MB的对象.更糟糕的是,Python酸洗非常缓慢; 酸洗本身花了几秒钟,这发生在传递给工人流程的每一块工作中.除了腌制和传递大数据对象之外,apply_asyncLinux中的开销非常小; 对于一个小函数(添加几个整数参数),每对apply_async/ 每get对仅需0.2 ms .因此,以非常小的块分割工作本身并不是问题.所以,我将所有大数组参数作为索引传递给全局变量.为了CPU缓存优化,我保持小块大小.

全局变量存储在全局变量中dict; 在设置工作池之后,将立即在父进程中删除这些条目.只有密钥才dict传送给工作人员.酸洗/ IPC唯一的大数据是工人创建的新数据.

#!/usr/bin/python2.7

import numpy as np, sys
from multiprocessing import Pool

_mproc_data = {}  # global storage for objects during multiprocessing.

class BigData:
    def __init__(self, size):
        self.blah = np.random.uniform(0, 1, size=size)

    def do_chunk(self, k, xi, yi):
        # do the work and return an array of the same shape as xi, yi
        zi = k*np.ones_like(xi)
        return zi

    def do_all_work(self, xi, yi, num_proc):
        global _mproc_data
        mp_key = str(id(self))
        _mproc_data['bd'+mp_key] = self # BigData
        _mproc_data['xi'+mp_key] = xi
        _mproc_data['yi'+mp_key] = yi
        pool = Pool(processes=num_proc)
        # processes have now inherited the global variabele; clean up in the parent process
        for v in ['bd', 'xi', 'yi']:
            del _mproc_data[v+mp_key]

        # setup indices for the worker processes (placeholder)
        n_chunks = 45
        n = len(xi)
        chunk_len = n//n_chunks
        i1list = np.arange(0,n,chunk_len)
        i2list = i1list + chunk_len
        i2list[-1] = n
        klist = range(n_chunks) # placeholder

        procs = []
        for i in range(n_chunks):
            p = pool.apply_async( _do_chunk_wrapper, (mp_key, i1list[i], i2list[i], klist[i]) )
            sys.stderr.write(".")
            procs.append(p)
        sys.stderr.write("\n")

        # allocate space for combined results
        zi = np.zeros_like(xi)

        # get data from workers and finish  
        for i, p in enumerate(procs):
            zi[i1list[i]:i2list[i]] = p.get(timeout=30) # timeout allows ctrl-C handling

        pool.close()
        pool.join()

        return zi

def _do_chunk_wrapper(key, i1, i2, k):
    """All arguments are small objects."""
    global _mproc_data
    bd = _mproc_data['bd'+key]
    xi = _mproc_data['xi'+key][i1:i2]
    yi = _mproc_data['yi'+key][i1:i2]
    return bd.do_chunk(k, xi, yi)


if __name__ == "__main__":
    xi, yi = np.linspace(1, 100, 100001), np.linspace(1, 100, 100001)
    bd = BigData(int(1e7))
    bd.do_all_work(xi, yi, 4)
Run Code Online (Sandbox Code Playgroud)

这里有一个速度测试的结果(再次,2个核,4个线程),变化的工作进程的数量和存储器中的组块(的总字节量xi,yi,zi阵列切片).这些数字是"每秒百万结果值",但这对比较并不重要."1进程"的行是do_chunk对完整输入数据的直接调用,没有任何子进程.

#Proc   125K    250K    500K   1000K   unlimited
1                                      0.82 
2       4.28    1.96    1.3     1.31 
3       2.69    1.06    1.06    1.07 
4       2.17    1.27    1.23    1.28 
Run Code Online (Sandbox Code Playgroud)

数据大小对内存的影响非常大.CPU具有3 MB共享L3缓存,每个核心具有256 KB L2缓存.请注意,计算还需要访问BigData对象的几MB内部数据.因此,我们从中学到的是进行这种速度测试很有用.对于这个程序,2个进程最快,其次是4个,3个是最慢的.

unu*_*tbu 13

尝试减少进程间通信.在multiprocessing模块中通过队列完成所有(单机)进程间通信.通过队列传递的对象被腌制.因此,尝试通过队列发送更少和/或更小的对象.

  • 不要通过队列发送self实例BigData.它相当大,随着数据量的self增长而变大:

    In [6]: import pickle
    In [14]: len(pickle.dumps(BigData(50)))
    Out[14]: 1052187
    
    Run Code Online (Sandbox Code Playgroud)

    每次pool.apply_async( _do_chunk_wrapper, (self, k, xi, yi))调用时, self都会在主进程中进行pickle并在工作进程中进行unpickled.大小len(pickle.dumps(BigData(N)))增加了N.

  • 让数据从全局变量中读取.在Linux上,您可以利用Copy-on-Write.正如Jan-Philip Gehrcke所说:

    在fork()之后,父级和子级处于等效状态.将父级的整个内存复制到RAM中的另一个位置是愚蠢的.那就是[其中]写入时复制原则[来自].只要孩子不改变其内存状态,它实际上访问父母的内存.只有在修改时,相应的位和片才被复制到子节点的存储空间中.

    因此,您可以BigData通过简单地将实例定义为全局bd = BigData(n),(正如您已经在做)并在工作进程中引用其值(例如_do_chunk_wrapper)来避免传递通过Queue的实例.它基本上等于self从电话中删除pool.apply_async:

    p = pool.apply_async(_do_chunk_wrapper, (k_start, k_end, xi, yi))
    
    Run Code Online (Sandbox Code Playgroud)

    bd作为全球访问,并对do_chunk_wrapper呼叫签名进行必要的随之更改.

  • 尝试将更长时间运行的函数传递funcpool.apply_async.如果你有许多快速完成的调用,pool.apply_async那么传递参数和通过队列返回值的开销将成为整个时间的重要部分.相反,如果您在返回结果之前进行较少的调用pool.apply_async并提供func更多工作,那么进程间通信将占总时间的一小部分.

    下面,我修改_do_chunk_wrapper为接受k_startk_end参数,以便每次调用pool.apply_asynck在返回结果之前计算许多值的总和.


import math
import numpy as np
import time
import sys
import multiprocessing as mp
import scipy.interpolate as interpolate

_tm=0
def stopwatch(msg=''):
    tm = time.time()
    global _tm
    if _tm==0: _tm = tm; return
    print("%s: %.2f seconds" % (msg, tm-_tm))
    _tm = tm

class BigData:
    def __init__(self, n):
        z = np.random.uniform(size=n*n*n).reshape((n,n,n))
        self.ff = []
        for i in range(n):
            f = interpolate.RectBivariateSpline(
                np.arange(n), np.arange(n), z[i], kx=1, ky=1)
            self.ff.append(f)
        self.n = n

    def do_chunk(self, k, xi, yi):
        n = self.n
        s = np.sum(np.exp(self.ff[k].ev(xi, yi)))
        sys.stderr.write(".")
        return s

    def do_chunk_of_chunks(self, k_start, k_end, xi, yi):
        s = sum(np.sum(np.exp(self.ff[k].ev(xi, yi)))
                    for k in range(k_start, k_end))
        sys.stderr.write(".")
        return s

    def do_multi(self, numproc, xi, yi):
        procs = []
        pool = mp.Pool(numproc)
        stopwatch('\nPool setup')
        ks = list(map(int, np.linspace(0, self.n, numproc+1)))
        for i in range(len(ks)-1):
            k_start, k_end = ks[i:i+2]
            p = pool.apply_async(_do_chunk_wrapper, (k_start, k_end, xi, yi))
            procs.append(p)
        stopwatch('Jobs queued (%d processes)' % numproc)
        total = 0.0
        for k, p in enumerate(procs):
            total += np.sum(p.get(timeout=30)) # timeout allows ctrl-C interrupt
            if k == 0: stopwatch("\nFirst get() done")
        print(total)
        stopwatch('Jobs done')
        pool.close()
        pool.join()
        return total

    def do_single(self, xi, yi):
        total = 0.0
        for k in range(self.n):
            total += self.do_chunk(k, xi, yi)
        stopwatch('\nAll in single process')
        return total

def _do_chunk_wrapper(k_start, k_end, xi, yi): 
    return bd.do_chunk_of_chunks(k_start, k_end, xi, yi)        

if __name__ == "__main__":
    stopwatch()
    n = 50
    bd = BigData(n)
    m = 1000*1000
    xi, yi = np.random.uniform(0, n, size=m*2).reshape((2,m))
    stopwatch('Initialized')
    bd.do_multi(2, xi, yi)
    bd.do_multi(3, xi, yi)
    bd.do_single(xi, yi)
Run Code Online (Sandbox Code Playgroud)

产量

Initialized: 0.15 seconds

Pool setup: 0.06 seconds
Jobs queued (2 processes): 0.00 seconds

First get() done: 6.56 seconds
83963796.0404
Jobs done: 0.55 seconds
..
Pool setup: 0.08 seconds
Jobs queued (3 processes): 0.00 seconds

First get() done: 5.19 seconds
83963796.0404
Jobs done: 1.57 seconds
...
All in single process: 12.13 seconds
Run Code Online (Sandbox Code Playgroud)

与原始代码相比:

Initialized: 0.10 seconds
Pool setup: 0.03 seconds
Jobs queued (2 processes): 0.00 seconds

First get() done: 10.47 seconds
Jobs done: 0.00 seconds
..................................................
Pool setup: 0.12 seconds
Jobs queued (3 processes): 0.00 seconds

First get() done: 9.21 seconds
Jobs done: 0.00 seconds
..................................................
All in single process: 12.12 seconds
Run Code Online (Sandbox Code Playgroud)