检索使用multiprocessing.Pool.map启动的进程的退出代码

Mat*_*ois 3 python multiprocessing

我正在使用python multiprocessing模块来并行化一些计算量很大的任务.显而易见的选择是使用一个Pool工人然后使用该map方法.

但是,进程可能会失败.例如,他们可能会被悄悄地杀死,例如被杀死oom-killer.因此,我希望能够检索启动过程的退出代码map.

另外,为了记录目的,我希望能够知道为执行迭代中的每个值而启动的进程的PID.

dan*_*ano 6

如果您正在使用multiprocessing.Pool.map通常对池中子流程的退出代码不感兴趣,那么您对它们从工作项返回的值感兴趣.这是因为在正常情况下,a中的进程在Poolclose/ join池之前不会退出,因此在完成所有工作之前没有要检索的退出代码,并且Pool即将销毁.因此,没有公共API来获取这些子流程的退出代码.

现在,你担心异常情况,带外的东西会在其工作的同时杀死其中一个子流程.如果你遇到这样的问题,你可能会遇到一些奇怪的行为.事实上,在我的测试中,我在一段Pool时间内杀死了一个进程,它正在作为一个map调用的一部分工作,map从未完成,因为被杀死的进程没有完成.但是,Python确实立即启动了一个新进程来替换我杀死的进程.

也就是说,您可以通过multiprocessing.Process使用private _pool属性直接访问池中的对象来获取池中每个进程的pid :

pool = multiprocessing.Pool()
for proc in pool._pool:
  print proc.pid
Run Code Online (Sandbox Code Playgroud)

因此,您可以尝试检测进程何时意外死亡(假设您不会因此而陷入阻塞调用).您可以通过在调用之前和之后检查池中的进程列表来执行此操作map_async:

before = pool._pool[:]  # Make a copy of the list of Process objects in our pool
result = pool.map_async(func, iterable)  # Use map_async so we don't get stuck.
while not result.ready():  # Wait for the call to complete
    if any(proc.exitcode for proc in before):  # Abort if one of our original processes is dead.
        print "One of our processes has exited. Something probably went horribly wrong."
        break
    result.wait(timeout=1)
else:  # We'll enter this block if we don't reach `break` above.
    print result.get() # Actually fetch the result list here.
Run Code Online (Sandbox Code Playgroud)

我们必须复制一个列表,因为当一个进程在Pooldie中时,Python立即用一个新进程替换它,并从列表中删除死的一个.

这在我的测试中对我有用,但因为它依赖于Pool对象的私有属性(_pool),所以在生产代码中使用它是有风险的.我还建议,过多担心这种情况可能有点过头了,因为它不太可能发生并且使实施变得非常复杂.