Python多处理早期终止

fir*_*ter 4 python multiprocessing python-3.x

我的脚本运行时,有时可能会发生错误。在这种情况下,应正确终止所有进程,应返回错误消息,并退出脚本。

我现在拥有的代码似乎尚未满足这些要求。发生错误时,会将其发送到report_error(),该脚本最终挂在终端中,活动监视器显示许多Python进程仍在运行。

环境

  • Mac OS X 10.8.5
  • Python 3.3.3

从脚本中的任意点终止所有进程的正确方法是什么?

#!/usr/bin/env python3
# -*- coding: utf-8 -*-


import sys
from multiprocessing import Pool


# Global variables.

input_files = [
    'test_data_0.csv',
    'test_data_1.csv'
]


def report_error(error):

    # Reports errors then exits script.
    print("Error: {0}".format(error), file=sys.stderr)
    sys.exit(1)

    # What I really want is to report the error, properly terminate all processes,
    # and then exit the script.


def read_file(file):

    try:
        # Read file into list.
    except Exception as error:
        report_error(error)


def check_file(file):

    # Do some error checking on file.
    if error:
        report_error(error)


def job(file):

    # Executed on each item in input_files.

    check_file(file)
    read_file(file)


def main():

    # Sets up a process pool. Defaults to number of cores.
    # Each input gets passed to job and processed in a separate process.
    p = Pool()
    p.map(job, input_files)

    # Closing and joining a pool is important to ensure all resources are freed properly.
    p.close()
    p.join()


if __name__ == '__main__':
    main()
Run Code Online (Sandbox Code Playgroud)

dan*_*ano 5

首先,使用sys.exit()杀死子工作进程实际上会破坏池,并使map命令永远挂起。目前multiprocessing无法正常从工作进程崩溃时,工人正在处理作业恢复(有一个bug报告与一个补丁解决了这个问题在这里,什么它的价值)。

您可以通过两种方法来完成实际要做的事情。由于您似乎并不在乎从worker函数返回的值,因此最简单的方法是使用imap_unordered而不是map,在出现故障时从worker引发异常,然后简单地迭代由imap_unordered以下方法返回的迭代器:

def report_error(error):

    # Reports errors then exits script.
    print("Error: {0}".format(error), file=sys.stderr)
    raise error # Raise the exception

...

def main():
    p = Pool()
    try:
        list(p.imap_unordered(job, input_files))
    except Exception:
        print("a worker failed, aborting...")
        p.close()
        p.terminate()
    else:
        p.close()
        p.join()

if __name__ == '__main__':
    main()
Run Code Online (Sandbox Code Playgroud)

使用imap_unordered,一旦孩子发送结果,结果将返回给父母。因此,如果子级将异常发送回父级,它将立即在父级流程中重新引发。我们捕获到该异常,打印一条消息,然后终止该池。