多处理中的工作者是否有办法.Pool的apply_async可以捕获错误并继续?

eha*_*nom 3 python exception-handling multiprocessing

使用multiprocessing.Pool apply_async()时,代码中断会发生什么?我认为这包括例外情况,但可能还有其他因素会导致工作人员失败.

import multiprocessing as mp
pool = mp.Pool(mp.cpu_count())
for f in files:
    pool.apply_async(workerfunct, args=(*args), callback=callbackfunct) 
Run Code Online (Sandbox Code Playgroud)

正如我现在所理解的那样,进程/工作程序失败(所有其他进程继续)并且没有执行抛出错误的任何内容,即使我用try/except捕获错误也是如此.

作为一个例子,通常我除了错误并输入默认值和/或打印出错误消息,然后代码继续.如果我的回调函数涉及写入文件,则使用默认值完成.

这位回答者写了一些关于它的文章:

我怀疑你没有看到你的示例代码发生任何事情的原因是因为所有的工作者函数调用都失败了.如果worker函数失败,则永远不会执行回调.除非您尝试从apply_async调用返回的AsyncResult对象中获取结果,否则根本不会报告失败.但是,由于您没有保存任何这些对象,因此您永远不会知道发生的故障.如果我是你,我会在你测试时尝试使用pool.apply,这样你就会在发生错误时立即看到错误.

dan*_*ano 6

如果您使用的是Python 3.2+,则可以使用error_callback关键字参数来处理在worker中引发的异常.

pool.apply_async(workerfunct, args=(*args), callback=callbackfunct, error_callback=handle_error) 
Run Code Online (Sandbox Code Playgroud)

handle_error 将使用异常对象作为参数调用.

如果不是,则必须将所有工作函数包装在try/中except以确保callback执行.(我认为你的印象是,在其他问题上我的回答不起作用,但事实并非如此.抱歉!):

def workerfunct(*args):
    try:
        # Stuff
    except Exception as e:
        # Do something here, maybe return e?

pool.apply_async(workerfunct, args=(*args), callback=callbackfunct) 
Run Code Online (Sandbox Code Playgroud)

如果您不想/不想更改实际要调用的函数,也可以使用包装函数:

def wrapper(func, *args):
    try:
        return func(*args)
    except Exception as e:
        return e

pool.apply_async(wrapper, args=(workerfunct, *args), callback=callbackfunct) 
Run Code Online (Sandbox Code Playgroud)