Rob*_*ens 64 python exception multiprocessing
似乎当从multiprocessing.Pool进程引发异常时,没有堆栈跟踪或任何其他指示它已失败.例:
from multiprocessing import Pool
def go():
print(1)
raise Exception()
print(2)
p = Pool()
p.apply_async(go)
p.close()
p.join()
Run Code Online (Sandbox Code Playgroud)
打印1并静默停止.有趣的是,提高BaseException会起作用.有没有办法让所有异常的行为与BaseException相同?
Fah*_*tha 50
也许我错过了一些东西,但这不是getResult对象返回的方法吗?请参阅流程池.
class multiprocessing.pool.AsyncResult
Pool.apply_async()和Pool.map_async()返回的结果类.get([timeout])
返回结果.如果timeout不是None并且结果未在超时秒内到达,则引发multiprocessing.TimeoutError.如果远程调用引发了异常,那么get()将重新启动该异常.
所以,稍微修改你的例子就可以了
from multiprocessing import Pool
def go():
print(1)
raise Exception("foobar")
print(2)
p = Pool()
x = p.apply_async(go)
x.get()
p.close()
p.join()
Run Code Online (Sandbox Code Playgroud)
这给出了结果
1
Traceback (most recent call last):
File "rob.py", line 10, in <module>
x.get()
File "/usr/lib/python2.6/multiprocessing/pool.py", line 422, in get
raise self._value
Exception: foobar
Run Code Online (Sandbox Code Playgroud)
这并不完全令人满意,因为它不会打印回溯,但总比没有好.
更新:这个错误已在Python 3.4中修复,由Richard Oudkerk提供.请参阅multiprocessing.pool.Async的问题get方法应该返回完整的回溯.
Rup*_*ash 29
我有一个合理的解决方案,至少在调试方面.我目前没有一个解决方案可以在主流程中引发异常.我的第一个想法是使用装饰器,但你只能在模块顶层定义的 pickle 函数,所以这就是正确的.
相反,一个简单的包装类和一个使用它的Pool子类apply_async(因此apply).我将map_async作为练习留给读者.
import traceback
from multiprocessing.pool import Pool
import multiprocessing
# Shortcut to multiprocessing's logger
def error(msg, *args):
return multiprocessing.get_logger().error(msg, *args)
class LogExceptions(object):
def __init__(self, callable):
self.__callable = callable
def __call__(self, *args, **kwargs):
try:
result = self.__callable(*args, **kwargs)
except Exception as e:
# Here we add some debugging help. If multiprocessing's
# debugging is on, it will arrange to log the traceback
error(traceback.format_exc())
# Re-raise the original exception so the Pool worker can
# clean up
raise
# It was fine, give a normal answer
return result
class LoggingPool(Pool):
def apply_async(self, func, args=(), kwds={}, callback=None):
return Pool.apply_async(self, LogExceptions(func), args, kwds, callback)
def go():
print(1)
raise Exception()
print(2)
multiprocessing.log_to_stderr()
p = LoggingPool(processes=1)
p.apply_async(go)
p.close()
p.join()
Run Code Online (Sandbox Code Playgroud)
这给了我:
1
[ERROR/PoolWorker-1] Traceback (most recent call last):
File "mpdebug.py", line 24, in __call__
result = self.__callable(*args, **kwargs)
File "mpdebug.py", line 44, in go
raise Exception()
Exception
Run Code Online (Sandbox Code Playgroud)
goz*_*lli 20
在撰写本文时获得最多选票的解决方案存在一个问题:
from multiprocessing import Pool
def go():
print(1)
raise Exception("foobar")
print(2)
p = Pool()
x = p.apply_async(go)
x.get() ## waiting here for go() to complete...
p.close()
p.join()
Run Code Online (Sandbox Code Playgroud)
正如@dfrankow所指出的,它会等待x.get(),这会破坏异步运行任务的重点.因此,为了提高效率(特别是如果你的工作函数go需要很长时间),我会将其更改为:
from multiprocessing import Pool
def go(x):
print(1)
# task_that_takes_a_long_time()
raise Exception("Can't go anywhere.")
print(2)
return x**2
p = Pool()
results = []
for x in range(1000):
results.append( p.apply_async(go, [x]) )
p.close()
for r in results:
r.get()
Run Code Online (Sandbox Code Playgroud)
优点:worker函数是异步运行的,因此,例如,如果您在多个核心上运行许多任务,那么它将比原始解决方案更有效.
缺点:如果worker函数中存在异常,则只有在池完成所有任务后才会引发异常.这可能是也可能不是理想的行为.根据@ colinfang的评论编辑,修正了这个问题.
我已成功使用此装饰器记录异常:
import traceback, functools, multiprocessing
def trace_unhandled_exceptions(func):
@functools.wraps(func)
def wrapped_func(*args, **kwargs):
try:
func(*args, **kwargs)
except:
print 'Exception in '+func.__name__
traceback.print_exc()
return wrapped_func
Run Code Online (Sandbox Code Playgroud)
用问题中的代码,它是
@trace_unhandled_exceptions
def go():
print(1)
raise Exception()
print(2)
p = multiprocessing.Pool(1)
p.apply_async(go)
p.close()
p.join()
Run Code Online (Sandbox Code Playgroud)
只需装饰您传递给流程池的功能即可.这项工作的关键是@functools.wraps(func)多处理抛出一个PicklingError.
上面的代码给出
1
Exception in go
Traceback (most recent call last):
File "<stdin>", line 5, in wrapped_func
File "<stdin>", line 4, in go
Exception
Run Code Online (Sandbox Code Playgroud)
由于已经有不错的答案multiprocessing.Pool可用,我将使用不同的方法提供一个解决方案以确保完整性。
对于python >= 3.2以下解决方案似乎是最简单的:
from concurrent.futures import ProcessPoolExecutor, wait
def go():
print(1)
raise Exception()
print(2)
futures = []
with ProcessPoolExecutor() as p:
for i in range(10):
futures.append(p.submit(go))
results = [f.result() for f in futures]
Run Code Online (Sandbox Code Playgroud)
好处:
有关 API 的更多信息,请查看此
此外,如果您要提交大量任务,并且希望您的主流程在其中一项任务失败后立即失败,则可以使用以下代码段:
from concurrent.futures import ProcessPoolExecutor, wait, FIRST_EXCEPTION, as_completed
import time
def go():
print(1)
time.sleep(0.3)
raise Exception()
print(2)
futures = []
with ProcessPoolExecutor(1) as p:
for i in range(10):
futures.append(p.submit(go))
for f in as_completed(futures):
if f.exception() is not None:
for f in futures:
f.cancel()
break
[f.result() for f in futures]
Run Code Online (Sandbox Code Playgroud)
只有在执行完所有任务后,所有其他答案才会失败。
既然您已经使用过apply_sync,我猜用例是想要执行一些同步任务。使用回调进行处理是另一种选择。请注意,此选项仅适用于 python3.2 及以上版本,不适用于 python2.7。
from multiprocessing import Pool
def callback(result):
print('success', result)
def callback_error(result):
print('error', result)
def go():
print(1)
raise Exception()
print(2)
p = Pool()
p.apply_async(go, callback=callback, error_callback=callback_error)
# You can do another things
p.close()
p.join()
Run Code Online (Sandbox Code Playgroud)