Dan*_*all 26 python multithreading multiprocessing try-catch-finally
我的理解是,如果输入了try,则必须*始终*执行finally子句.
import random
from multiprocessing import Pool
from time import sleep
def Process(x):
try:
print x
sleep(random.random())
raise Exception('Exception: ' + x)
finally:
print 'Finally: ' + x
Pool(3).map(Process, ['1','2','3'])
Run Code Online (Sandbox Code Playgroud)
预期的输出是对于每个由第8行单独打印的x,必须出现'Finally x'.
示例输出:
$ python bug.py
1
2
3
Finally: 2
Traceback (most recent call last):
File "bug.py", line 14, in <module>
Pool(3).map(Process, ['1','2','3'])
File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 225, in map
return self.map_async(func, iterable, chunksize).get()
File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 522, in get
raise self._value
Exception: Exception: 2
Run Code Online (Sandbox Code Playgroud)
似乎终止一个进程的异常终止了父进程和兄弟进程,即使在其他进程中还有其他工作需要完成.
为什么我错了?为什么这是正确的?如果这是正确的,怎么要一个安全清理多进程的Python资源?
unu*_*tbu 34
简短回答:SIGTERM特朗普finally.
答案很长:打开日志记录mp.log_to_stderr():
import random
import multiprocessing as mp
import time
import logging
logger=mp.log_to_stderr(logging.DEBUG)
def Process(x):
try:
logger.info(x)
time.sleep(random.random())
raise Exception('Exception: ' + x)
finally:
logger.info('Finally: ' + x)
result=mp.Pool(3).map(Process, ['1','2','3'])
Run Code Online (Sandbox Code Playgroud)
日志输出包括:
[DEBUG/MainProcess] terminating workers
Run Code Online (Sandbox Code Playgroud)
这与以下代码对应multiprocessing.pool._terminate_pool:
if pool and hasattr(pool[0], 'terminate'):
debug('terminating workers')
for p in pool:
p.terminate()
Run Code Online (Sandbox Code Playgroud)
每个p中pool是一个multiprocessing.Process,并调用terminate(至少在非Windows机器)调用SIGTERM:
来自multiprocessing/forking.py:
class Popen(object)
def terminate(self):
...
try:
os.kill(self.pid, signal.SIGTERM)
except OSError, e:
if self.wait(timeout=0.1) is None:
raise
Run Code Online (Sandbox Code Playgroud)
因此,它归结为当try套件中的Python进程被发送时会发生什么SIGTERM.
请考虑以下示例(test.py):
import time
def worker():
try:
time.sleep(100)
finally:
print('enter finally')
time.sleep(2)
print('exit finally')
worker()
Run Code Online (Sandbox Code Playgroud)
如果你运行它,然后发送它SIGTERM,然后过程立即结束,而不进入finally套件,没有输出证明,没有延迟.
在一个终端:
% test.py
Run Code Online (Sandbox Code Playgroud)
在第二个终端:
% pkill -TERM -f "test.py"
Run Code Online (Sandbox Code Playgroud)
结果在第一个终端:
Terminated
Run Code Online (Sandbox Code Playgroud)
将其与发送进程a SIGINT(C-c)时发生的情况进行比较:
在第二个终端:
% pkill -INT -f "test.py"
Run Code Online (Sandbox Code Playgroud)
结果在第一个终端:
enter finally
exit finally
Traceback (most recent call last):
File "/home/unutbu/pybin/test.py", line 14, in <module>
worker()
File "/home/unutbu/pybin/test.py", line 8, in worker
time.sleep(100)
KeyboardInterrupt
Run Code Online (Sandbox Code Playgroud)
结论:SIGTERM王牌finally.
小智 5
unutbu 的答案明确解释了为什么你会出现你所观察到的行为。然而,应该强调的是,SIGTERM 的发送只是因为它multiprocessing.pool._terminate_pool是如何实现的。如果您可以避免使用Pool,那么您可以获得您想要的行为。这是一个借来的例子:
from multiprocessing import Process
from time import sleep
import random
def f(x):
try:
sleep(random.random()*10)
raise Exception
except:
print "Caught exception in process:", x
# Make this last longer than the except clause in main.
sleep(3)
finally:
print "Cleaning up process:", x
if __name__ == '__main__':
processes = []
for i in range(4):
p = Process(target=f, args=(i,))
p.start()
processes.append(p)
try:
for process in processes:
process.join()
except:
print "Caught exception in main."
finally:
print "Cleaning up main."
Run Code Online (Sandbox Code Playgroud)
发送 SIGINT 后,示例输出为:
Caught exception in process: 0
^C
Cleaning up process: 0
Caught exception in main.
Cleaning up main.
Caught exception in process: 1
Caught exception in process: 2
Caught exception in process: 3
Cleaning up process: 1
Cleaning up process: 2
Cleaning up process: 3
Run Code Online (Sandbox Code Playgroud)
请注意,该finally子句针对所有进程运行。如果您需要共享内存,请考虑使用Queue、Pipe、Manager或某些外部存储,例如redis或sqlite3。