如何利用python多处理来利用所有核心

Dar*_*one 21 python multiprocessing

现在已经摆弄了Python的multicore功能超过一个小时,试图使用Process和并行化一个相当复杂的图遍历函数Manager:

import networkx as nx
import csv
import time 
from operator import itemgetter
import os
import multiprocessing as mp

cutoff = 1

exclusionlist = ["cpd:C00024"]

DG = nx.read_gml("KeggComplete.gml", relabel = True)

for exclusion in exclusionlist:
    DG.remove_node(exclusion)

#checks if 'memorizedPaths exists, and if not, creates it
fn = os.path.join(os.path.dirname(__file__), 'memorizedPaths' + str(cutoff+1))
if not os.path.exists(fn):
    os.makedirs(fn)

manager = mp.Manager()
memorizedPaths = manager.dict()
filepaths = manager.dict()
degreelist = sorted(DG.degree_iter(),key=itemgetter(1),reverse=True)

def _all_simple_paths_graph(item, DG, cutoff, memorizedPaths, filepaths):
    source = item[0]
    uniqueTreePaths = []
    if cutoff < 1:
        return
    visited = [source]
    stack = [iter(DG[source])]
    while stack:
        children = stack[-1]
        child = next(children, None)
        if child is None:
            stack.pop()
            visited.pop()
        elif child in memorizedPaths:
            for path in memorizedPaths[child]:
                newPath = (tuple(visited) + tuple(path))
                if (len(newPath) <= cutoff) and (len(set(visited) & set(path)) == 0):
                    uniqueTreePaths.append(newPath)
            continue
        elif len(visited) < cutoff:
            if child not in visited:
                visited.append(child)
                stack.append(iter(DG[child]))
                if visited not in uniqueTreePaths:
                    uniqueTreePaths.append(tuple(visited))
        else: #len(visited) == cutoff:
            if (visited not in uniqueTreePaths) and (child not in visited):
                uniqueTreePaths.append(tuple(visited + [child]))
            stack.pop()
            visited.pop()
    #writes the absolute path of the node path file into the hash table
    filepaths[source] = str(fn) + "/" + str(source) +"path.txt"
    with open (filepaths[source], "wb") as csvfile2:
        writer = csv.writer(csvfile2, delimiter=' ', quotechar='|')
        for path in uniqueTreePaths:
            writer.writerow(path)
    memorizedPaths[source] = uniqueTreePaths

############################################################################

start = time.clock()
if __name__ == '__main__':
    for item in degreelist:
        test = mp.Process(target=_all_simple_paths_graph, args=(DG, cutoff, item, memorizedPaths, filepaths))
        test.start()
        test.join()
end = time.clock()
print (end-start)
Run Code Online (Sandbox Code Playgroud)

目前 - 虽然运气和魔术 - 它有效(有点).我的问题是我只使用了24个内核中的12个.

有人可以解释为什么会出现这种情况吗?也许我的代码不是最好的多处理解决方案,或者它是我的架构的一个特性[Intel Xeon CPU E5-2640 @ 2.50GHz x18在Ubuntu 13.04 x64上运行]?

编辑:

我设法得到:

p = mp.Pool()
for item in degreelist:
    p.apply_async(_all_simple_paths_graph, args=(DG, cutoff, item, memorizedPaths, filepaths))
p.close()
p.join()
Run Code Online (Sandbox Code Playgroud)

然而,工作非常慢!所以我假设我正在使用错误的功能.希望它有助于澄清我正在努力实现的目标!

EDIT2:.map尝试:

partialfunc = partial(_all_simple_paths_graph, DG=DG, cutoff=cutoff, memorizedPaths=memorizedPaths, filepaths=filepaths)
p = mp.Pool()
for item in processList:
    processVar = p.map(partialfunc, xrange(len(processList)))   
p.close()
p.join()
Run Code Online (Sandbox Code Playgroud)

作品,比单曲慢.时间优化!

Tim*_*ers 45

太多堆放在这里的意见来解决,因此,这里mpmultiprocessing:

mp.cpu_count()应该返回处理器的数量.但要测试一下.有些平台很时髦,而且这些信息并不容易获得.Python尽其所能.

如果你开始24个流程,它们就会完全按照你的要求去做;-)看起来mp.Pool()对你来说最方便.您将要创建的进程数传递给其构造函数. mp.Pool(processes=None)mp.cpu_count()用于处理器的数量.

然后,您可以使用,例如,.imap_unordered(...)在您的Pool实例上传播您的degreelist跨进程.或者也许其他一些Pool方法对你更有效 - 实验.

如果你不能把问题打到Pool世界的视野中,你可以改为创建mp.Queue一个工作队列,.put()在主程序中创建一个节点(或节点片,以减少开销)并写入工人.get()从那个队列中解决问题.询问您是否需要示例.请注意,您需要在所有"真实"工作项之后将队列值(每个进程一个)放在队列中,以便工作进程可以测试标记以了解它们何时完成.

仅供参考,我喜欢排队因为他们更明确.许多人喜欢Pool更好,因为他们更神奇;-)

池示例

这是一个可执行的原型.这显示了一种使用方式imap_unordered,Pool并且chunksize不需要更改任何功能签名.当然,你必须插入你的真实代码;-)请注意,该init_worker方法允许每个处理器只传递一次"大部分"参数,而不是每个项目中的每个项目传递一次degreeslist.减少进程间通信量对于速度至关重要.

import multiprocessing as mp

def init_worker(mps, fps, cut):
    global memorizedPaths, filepaths, cutoff
    global DG

    print "process initializing", mp.current_process()
    memorizedPaths, filepaths, cutoff = mps, fps, cut
    DG = 1##nx.read_gml("KeggComplete.gml", relabel = True)

def work(item):
    _all_simple_paths_graph(DG, cutoff, item, memorizedPaths, filepaths)

def _all_simple_paths_graph(DG, cutoff, item, memorizedPaths, filepaths):
    pass # print "doing " + str(item)

if __name__ == "__main__":
    m = mp.Manager()
    memorizedPaths = m.dict()
    filepaths = m.dict()
    cutoff = 1 ##
    # use all available CPUs
    p = mp.Pool(initializer=init_worker, initargs=(memorizedPaths,
                                                   filepaths,
                                                   cutoff))
    degreelist = range(100000) ##
    for _ in p.imap_unordered(work, degreelist, chunksize=500):
        pass
    p.close()
    p.join()
Run Code Online (Sandbox Code Playgroud)

我强烈建议完全按原样运行,这样你就可以看到它的速度非常快.然后稍微添加一些东西,看看它会如何影响时间.例如,只需添加

   memorizedPaths[item] = item
Run Code Online (Sandbox Code Playgroud)

_all_simple_paths_graph()减缓下来极大.为什么?因为每次添加时dict变得越来越大,并且这个过程安全的dict必须在所有进程之间同步(在封面下).同步单元是"整个字典" - 没有内部结构,mp机器可以利用它来对共享字典进行增量更新.

如果你负担不起这笔费用,那么就不能用Manager.dict()这个了.聪明的机会比比皆是;-)