Python:multiprocessing.map:如果一个进程引发异常,为什么不调用其他进程的finally块?

Dan*_*all 26 python multithreading multiprocessing try-catch-finally

我的理解是,如果输入了try,则必须*始终*执行finally子句.

import random

from multiprocessing import Pool
from time import sleep

def Process(x):
  try:
    print x
    sleep(random.random())
    raise Exception('Exception: ' + x)
  finally:
    print 'Finally: ' + x

Pool(3).map(Process, ['1','2','3'])
Run Code Online (Sandbox Code Playgroud)

预期的输出是对于每个由第8行单独打印的x,必须出现'Finally x'.

示例输出:

$ python bug.py 
1
2
3
Finally: 2
Traceback (most recent call last):
  File "bug.py", line 14, in <module>
    Pool(3).map(Process, ['1','2','3'])
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 225, in map
    return self.map_async(func, iterable, chunksize).get()
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 522, in get
    raise self._value
Exception: Exception: 2
Run Code Online (Sandbox Code Playgroud)

似乎终止一个进程的异常终止了父进程和兄弟进程,即使在其他进程中还有其他工作需要完成.

为什么我错了?为什么这是正确的?如果这是正确的,怎么一个安全清理多进程的Python资源?

unu*_*tbu 34

简短回答:SIGTERM特朗普finally.

答案很长:打开日志记录mp.log_to_stderr():

import random
import multiprocessing as mp
import time
import logging

logger=mp.log_to_stderr(logging.DEBUG)

def Process(x):
    try:
        logger.info(x)
        time.sleep(random.random())
        raise Exception('Exception: ' + x)
    finally:
        logger.info('Finally: ' + x)

result=mp.Pool(3).map(Process, ['1','2','3'])
Run Code Online (Sandbox Code Playgroud)

日志输出包括:

[DEBUG/MainProcess] terminating workers
Run Code Online (Sandbox Code Playgroud)

这与以下代码对应multiprocessing.pool._terminate_pool:

    if pool and hasattr(pool[0], 'terminate'):
        debug('terminating workers')
        for p in pool:
            p.terminate()
Run Code Online (Sandbox Code Playgroud)

每个ppool是一个multiprocessing.Process,并调用terminate(至少在非Windows机器)调用SIGTERM:

来自multiprocessing/forking.py:

class Popen(object)
    def terminate(self):
        ...
            try:
                os.kill(self.pid, signal.SIGTERM)
            except OSError, e:
                if self.wait(timeout=0.1) is None:
                    raise
Run Code Online (Sandbox Code Playgroud)

因此,它归结为当try套件中的Python进程被发送时会发生什么SIGTERM.

请考虑以下示例(test.py):

import time    
def worker():
    try:
        time.sleep(100)        
    finally:
        print('enter finally')
        time.sleep(2) 
        print('exit finally')    
worker()
Run Code Online (Sandbox Code Playgroud)

如果你运行它,然后发送它SIGTERM,然后过程立即结束,而不进入finally套件,没有输出证明,没有延迟.

在一个终端:

% test.py
Run Code Online (Sandbox Code Playgroud)

在第二个终端:

% pkill -TERM -f "test.py"
Run Code Online (Sandbox Code Playgroud)

结果在第一个终端:

Terminated
Run Code Online (Sandbox Code Playgroud)

将其与发送进程a SIGINT(C-c)时发生的情况进行比较:

在第二个终端:

% pkill -INT -f "test.py"
Run Code Online (Sandbox Code Playgroud)

结果在第一个终端:

enter finally
exit finally
Traceback (most recent call last):
  File "/home/unutbu/pybin/test.py", line 14, in <module>
    worker()
  File "/home/unutbu/pybin/test.py", line 8, in worker
    time.sleep(100)        
KeyboardInterrupt
Run Code Online (Sandbox Code Playgroud)

结论:SIGTERM王牌finally.


小智 5

unutbu 的答案明确解释了为什么你出现你所观察到的行为。然而,应该强调的是,SIGTERM 的发送只是因为它multiprocessing.pool._terminate_pool是如何实现的。如果您可以避免使用Pool,那么您可以获得您想要的行为。这是一个借来的例子

from multiprocessing import Process
from time import sleep
import random

def f(x):
    try:
        sleep(random.random()*10)
        raise Exception
    except:
        print "Caught exception in process:", x
        # Make this last longer than the except clause in main.
        sleep(3)
    finally:
        print "Cleaning up process:", x

if __name__ == '__main__':
    processes = []
    for i in range(4):
        p = Process(target=f, args=(i,))
        p.start()
        processes.append(p)
    try:
        for process in processes:
            process.join()
    except:
        print "Caught exception in main."
    finally:
        print "Cleaning up main."
Run Code Online (Sandbox Code Playgroud)

发送 SIGINT 后,示例输出为:

Caught exception in process: 0
^C
Cleaning up process: 0
Caught exception in main.
Cleaning up main.
Caught exception in process: 1
Caught exception in process: 2
Caught exception in process: 3
Cleaning up process: 1
Cleaning up process: 2
Cleaning up process: 3
Run Code Online (Sandbox Code Playgroud)

请注意,该finally子句针对所有进程运行。如果您需要共享内存,请考虑使用QueuePipeManager或某些外部存储,例如redissqlite3