Bot*_*ond 4 python asynchronous multiprocessing python-multiprocessing
我开始熟悉 Python 的multiprocessing模块。以下代码按预期工作:
#outputs 0 1 2 3
from multiprocessing import Pool
def run_one(x):
print x
return
pool = Pool(processes=12)
for i in range(4):
pool.apply_async(run_one, (i,))
pool.close()
pool.join()
Run Code Online (Sandbox Code Playgroud)
但是,现在,如果我围绕上述代码包装一个函数,print则不会执行语句(或至少重定向输出):
#outputs nothing
def run():
def run_one(x):
print x
return
pool = Pool(processes=12)
for i in range(4):
pool.apply_async(run_one, (i,))
pool.close()
pool.join()
Run Code Online (Sandbox Code Playgroud)
如果我将run_one定义移到之外run,则输出再次是预期的,当我调用时run():
#outputs 0 1 2 3
def run_one(x):
print x
return
def run():
pool = Pool(processes=12)
for i in range(4):
pool.apply_async(run_one, (i,))
pool.close()
pool.join()
Run Code Online (Sandbox Code Playgroud)
我在这里缺少什么?为什么第二个片段不打印任何东西?如果我只是调用该run_one(i)函数而不是使用apply_async,则所有三个代码输出相同。
池需要腌制(序列化)它发送到其工作进程的所有内容。Pickling 实际上只保存函数的名称,而 unpickling 需要按名称重新导入函数。为此,该函数需要在顶级定义,嵌套函数将不能被子进程导入,并且已经尝试对它们进行腌制会引发异常:
from multiprocessing.connection import _ForkingPickler
def run():
def foo(x):
pass
_ForkingPickler.dumps(foo) # multiprocessing custom pickler;
# same effect with pickle.dumps(foo)
run()
# Out:
Traceback (most recent call last):
...
AttributeError: Can't pickle local object 'run.<locals>.foo'
Run Code Online (Sandbox Code Playgroud)
您没有看到异常的原因是,因为Pool在父级中的酸洗任务期间已经开始捕获异常,并且只有在您调用调用时立即获得.get()的AsyncResult对象时才会重新引发它们pool.apply_async()。
这就是为什么(使用 Python 2)你最好总是这样使用它,即使你的目标函数没有返回任何东西(仍然返回隐式None):
results = [pool.apply_async(foo, (i,)) for i in range(4)]
# `pool.apply_async()` immediately returns AsyncResult (ApplyResult) object
for res in results:
res.get()
Run Code Online (Sandbox Code Playgroud)
非异步池方法喜欢Pool.map()和Pool.starmap()使用相同的(异步)底层函数,就像它们的异步兄弟一样,但它们额外调用.get()你,所以你总是会看到这些方法的异常。
Python 3 有一个error_callback用于异步池方法的参数,您可以使用它来处理异常。
| 归档时间: |
|
| 查看次数: |
1944 次 |
| 最近记录: |