Python多处理性能

Bas*_*sen 11 python performance multiprocessing

这应该是我的第三个也是最后一个问题,关于我试图提高我在python上做的一些统计分析的性能.我有两个版本的代码(单核与多处理),我期望通过使用多个核来获得性能,因为我希望我的代码能够解压缩/解压缩相当多的二进制字符串,遗憾的是我注意到性能实际上通过使用多个来减少核心.

我想知道是否有人对我观察到的内容有可能的解释(向下滚动到4月16日更新以获取更多信息)?

程序的关键部分是函数numpy_array(多处理中的+解码),下面的代码片段(可通过pastebin访问完整代码,下面进一步说明):

def numpy_array(data, peaks):
    rt_counter=0
    for x in peaks:
        if rt_counter %(len(peaks)/20) == 0:
            update_progress()
        peak_counter=0
        data_buff=base64.b64decode(x)
        buff_size=len(data_buff)/4
        unpack_format=">%dL" % buff_size
        index=0
        for y in struct.unpack(unpack_format,data_buff):
            buff1=struct.pack("I",y)
            buff2=struct.unpack("f",buff1)[0]
            if (index % 2 == 0):
                data[rt_counter][1][peak_counter][0]=float(buff2)
            else:
                data[rt_counter][1][peak_counter][1]=float(buff2)
                peak_counter+=1
            index+=1
        rt_counter+=1
Run Code Online (Sandbox Code Playgroud)

多处理版本使用一组函数执行此操作,我将在下面显示键2:

def tonumpyarray(mp_arr):
    return np.frombuffer(mp_arr.get_obj())

def numpy_array(shared_arr,peaks):
    processors=mp.cpu_count()
    with contextlib.closing(mp.Pool(processes=processors,
                                    initializer=pool_init,
                                    initargs=(shared_arr, ))) as pool:
        chunk_size=int(len(peaks)/processors)
        map_parameters=[]
        for i in range(processors):
            counter = i*chunk_size
            chunk=peaks[i*chunk_size:(i+1)*chunk_size]
            map_parameters.append((chunk, counter))
        pool.map(decode,map_parameters)

def decode ((chunk, counter)):
    data=tonumpyarray(shared_arr).view(
        [('f0','<f4'), ('f1','<f4',(250000,2))])
    for x in chunk:
        peak_counter=0
        data_buff=base64.b64decode(x)
        buff_size=len(data_buff)/4
        unpack_format=">%dL" % buff_size
        index=0
        for y in struct.unpack(unpack_format,data_buff):
            buff1=struct.pack("I",y)
            buff2=struct.unpack("f",buff1)[0]
            #with shared_arr.get_lock():
            if (index % 2 == 0):
                data[counter][1][peak_counter][0]=float(buff2)
            else:
                data[counter][1][peak_counter][1]=float(buff2)
                peak_counter+=1
            index+=1
        counter+=1
Run Code Online (Sandbox Code Playgroud)

可以通过这些pastebin链接访问完整的程序代码

Pastebin为单核版本

用于多处理版本的Pastebin

我用每个时间点包含239个时间点和~180k测量对的文件观察到的性能对于单核为~2.5m,对于多处理为~3.5.

PS:之前的两个问题(我第一次试图进行并列化):

  1. Python多处理
  2. 使我的NumPy数组在进程间共享

- 4月16日 -

我一直在使用cProfile库来分析我的程序(cProfile.run('main()') 在其中__main__,这表明有一步减慢了一切:

ncalls  tottime  percall  cumtime  percall filename:lineno(function)
23   85.859    3.733   85.859    3.733 {method 'acquire' of 'thread.lock' objects}
Run Code Online (Sandbox Code Playgroud)

我在这里不明白的是,thread.lock对象被用于threading(据我的理解)但不应该用于多处理,因为每个核心应该运行一个线程(除了拥有它自己的锁定机制),那么它是如何发生的为什么一次通话需要3.7秒?

900*_*000 2

共享数据是一种已知的因同步而导致速度变慢的情况。

您可以在进程之间拆分数据,或者为每个进程提供独立的副本吗?然后,在所有计算完成之前,您的流程不需要同步任何内容。

然后,我让主进程将所有工作处理器的输出加入到一个连贯的集合中。

该方法可能需要额外的 RAM,但现在 RAM 很便宜。

如果你问,我也对每个线程锁获取 3700 毫秒感到困惑。对于这样的特殊调用,OTOH 分析可能会出现错误。