未检测到多处理池中抛出的异常

Rob*_*ens 64 python exception multiprocessing

似乎当从multiprocessing.Pool进程引发异常时,没有堆栈跟踪或任何其他指示它已失败.例:

from multiprocessing import Pool 

def go():
    print(1)
    raise Exception()
    print(2)

p = Pool()
p.apply_async(go)
p.close()
p.join()
Run Code Online (Sandbox Code Playgroud)

打印1并静默停止.有趣的是,提高BaseException会起作用.有没有办法让所有异常的行为与BaseException相同?

Fah*_*tha 50

也许我错过了一些东西,但这不是getResult对象返回的方法吗?请参阅流程池.

class multiprocessing.pool.AsyncResult

Pool.apply_async()和Pool.map_async()返回的结果类.get([timeout])
返回结果.如果timeout不是None并且结果未在超时秒内到达,则引发multiprocessing.TimeoutError.如果远程调用引发了异常,那么get()将重新启动该异常.

所以,稍微修改你的例子就可以了

from multiprocessing import Pool

def go():
    print(1)
    raise Exception("foobar")
    print(2)

p = Pool()
x = p.apply_async(go)
x.get()
p.close()
p.join()
Run Code Online (Sandbox Code Playgroud)

这给出了结果

1
Traceback (most recent call last):
  File "rob.py", line 10, in <module>
    x.get()
  File "/usr/lib/python2.6/multiprocessing/pool.py", line 422, in get
    raise self._value
Exception: foobar
Run Code Online (Sandbox Code Playgroud)

这并不完全令人满意,因为它不会打印回溯,但总比没有好.

更新:这个错误已在Python 3.4中修复,由Richard Oudkerk提供.请参阅multiprocessing.pool.Async的问题get方法应该返回完整的回溯.

  • 注意:要为一堆同时运行的任务执行此操作,您应该将所有结果保存在列表中,然后使用get()遍历每个结果,如果您不想废弃,请使用try/catch包围第一个错误. (4认同)
  • 答案中的代码将等待`x.get()`,这破坏了异步应用任务的意义。@dfrankow 关于将结果保存到列表然后在最后“获取”它们的评论是一个更好的解决方案。 (2认同)

Rup*_*ash 29

我有一个合理的解决方案,至少在调试方面.我目前没有一个解决方案可以在主流程中引发异常.我的第一个想法是使用装饰器,但你只能在模块顶层定义的 pickle 函数,所以这就是正确的.

相反,一个简单的包装类和一个使用它的Pool子类apply_async(因此apply).我将map_async作为练习留给读者.

import traceback
from multiprocessing.pool import Pool
import multiprocessing

# Shortcut to multiprocessing's logger
def error(msg, *args):
    return multiprocessing.get_logger().error(msg, *args)

class LogExceptions(object):
    def __init__(self, callable):
        self.__callable = callable

    def __call__(self, *args, **kwargs):
        try:
            result = self.__callable(*args, **kwargs)

        except Exception as e:
            # Here we add some debugging help. If multiprocessing's
            # debugging is on, it will arrange to log the traceback
            error(traceback.format_exc())
            # Re-raise the original exception so the Pool worker can
            # clean up
            raise

        # It was fine, give a normal answer
        return result

class LoggingPool(Pool):
    def apply_async(self, func, args=(), kwds={}, callback=None):
        return Pool.apply_async(self, LogExceptions(func), args, kwds, callback)

def go():
    print(1)
    raise Exception()
    print(2)

multiprocessing.log_to_stderr()
p = LoggingPool(processes=1)

p.apply_async(go)
p.close()
p.join()
Run Code Online (Sandbox Code Playgroud)

这给了我:

1
[ERROR/PoolWorker-1] Traceback (most recent call last):
  File "mpdebug.py", line 24, in __call__
    result = self.__callable(*args, **kwargs)
  File "mpdebug.py", line 44, in go
    raise Exception()
Exception
Run Code Online (Sandbox Code Playgroud)

  • 我已经意识到可以使用装饰器,如果你使用`@ functools.wraps(func)`来装饰你的包装器.这使得装饰函数看起来像在模块顶层定义的函数. (5认同)

goz*_*lli 20

在撰写本文时获得最多选票的解决方案存在一个问题:

from multiprocessing import Pool

def go():
    print(1)
    raise Exception("foobar")
    print(2)

p = Pool()
x = p.apply_async(go)
x.get()  ## waiting here for go() to complete...
p.close()
p.join()
Run Code Online (Sandbox Code Playgroud)

正如@dfrankow所指出的,它会等待x.get(),这会破坏异步运行任务的重点.因此,为了提高效率(特别是如果你的工作函数go需要很长时间),我会将其更改为:

from multiprocessing import Pool

def go(x):
    print(1)
    # task_that_takes_a_long_time()
    raise Exception("Can't go anywhere.")
    print(2)
    return x**2

p = Pool()
results = []
for x in range(1000):
    results.append( p.apply_async(go, [x]) )

p.close()

for r in results:
     r.get()
Run Code Online (Sandbox Code Playgroud)

优点:worker函数是异步运行的,因此,例如,如果您在多个核心上运行许多任务,那么它将比原始解决方案更有效.

缺点:如果worker函数中存在异常,则只有在池完成所有任务才会引发异常.这可能是也可能不是理想的行为.根据@ colinfang的评论编辑,修正了这个问题.


Mar*_*man 9

我已成功使用此装饰器记录异常:

import traceback, functools, multiprocessing

def trace_unhandled_exceptions(func):
    @functools.wraps(func)
    def wrapped_func(*args, **kwargs):
        try:
            func(*args, **kwargs)
        except:
            print 'Exception in '+func.__name__
            traceback.print_exc()
    return wrapped_func
Run Code Online (Sandbox Code Playgroud)

用问题中的代码,它是

@trace_unhandled_exceptions
def go():
    print(1)
    raise Exception()
    print(2)

p = multiprocessing.Pool(1)

p.apply_async(go)
p.close()
p.join()
Run Code Online (Sandbox Code Playgroud)

只需装饰您传递给流程池的功能即可.这项工作的关键是@functools.wraps(func)多处理抛出一个PicklingError.

上面的代码给出

1
Exception in go
Traceback (most recent call last):
  File "<stdin>", line 5, in wrapped_func
  File "<stdin>", line 4, in go
Exception
Run Code Online (Sandbox Code Playgroud)


Vla*_*lad 7

由于已经有不错的答案multiprocessing.Pool可用,我将使用不同的方法提供一个解决方案以确保完整性。

对于python >= 3.2以下解决方案似乎是最简单的:

from concurrent.futures import ProcessPoolExecutor, wait

def go():
    print(1)
    raise Exception()
    print(2)


futures = []
with ProcessPoolExecutor() as p:
    for i in range(10):
        futures.append(p.submit(go))

results = [f.result() for f in futures]
Run Code Online (Sandbox Code Playgroud)

好处:

  • 很少的代码
  • 在主进程中引发异常
  • 提供堆栈跟踪
  • 没有外部依赖

有关 API 的更多信息,请查看

此外,如果您要提交大量任务,并且希望您的主流程在其中一项任务失败后立即失败,则可以使用以下代码段:

from concurrent.futures import ProcessPoolExecutor, wait, FIRST_EXCEPTION, as_completed
import time


def go():
    print(1)
    time.sleep(0.3)
    raise Exception()
    print(2)


futures = []
with ProcessPoolExecutor(1) as p:
    for i in range(10):
        futures.append(p.submit(go))

    for f in as_completed(futures):
        if f.exception() is not None:
            for f in futures:
                f.cancel()
            break

[f.result() for f in futures]
Run Code Online (Sandbox Code Playgroud)

只有在执行完所有任务后,所有其他答案才会失败。


楊立群*_*楊立群 5

既然您已经使用过apply_sync,我猜用例是想要执行一些同步任务。使用回调进行处理是另一种选择。请注意,此选项仅适用于 python3.2 及以上版本,不适用于 python2.7。

from multiprocessing import Pool

def callback(result):
    print('success', result)

def callback_error(result):
    print('error', result)

def go():
    print(1)
    raise Exception()
    print(2)

p = Pool()
p.apply_async(go, callback=callback, error_callback=callback_error)

# You can do another things

p.close()
p.join()
Run Code Online (Sandbox Code Playgroud)

  • 对于更高版本:https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool.apply_async (2认同)