Python多处理:处理父级中的子错误

dru*_*key 39 python error-handling multiprocessing

我目前正在玩多处理和队列.我编写了一段代码来从mongoDB导出数据,将其映射到关系(平面)结构,将所有值转换为字符串并将它们插入到mysql中.

这些步骤中的每一步都作为一个进程提交并给出导入/导出队列,对于在父进程中处理的mongoDB导出是安全的.

正如您将在下面看到的,我使用队列和子进程在从队列中读取"None"时自行终止.我目前遇到的问题是,如果子进程遇到未处理的异常,则父进程无法识别,其余只是保持运行.我想要发生的是整个shebang退出并且最多再加上孩子的错误.

我有两个问题:

  1. 如何检测父级中的子错误?
  2. 检测到错误后,如何杀死我的子进程(最佳实践)?我意识到将"无"放入队列以杀死孩子非常脏.

我正在使用python 2.7.

以下是我的代码的基本部分:

# Establish communication queues
mongo_input_result_q = multiprocessing.Queue()
mapper_result_q = multiprocessing.Queue()
converter_result_q = multiprocessing.Queue()
Run Code Online (Sandbox Code Playgroud)

[...]

    # create child processes
    # all processes generated here are subclasses of "multiprocessing.Process"

    # create mapper
    mappers = [mongo_relational_mapper.MongoRelationalMapper(mongo_input_result_q, mapper_result_q, columns, 1000)
               for i in range(10)]

    # create datatype converter, converts everything to str
    converters = [datatype_converter.DatatypeConverter(mapper_result_q, converter_result_q, 'str', 1000)
                  for i in range(10)]

    # create mysql writer
    # I create a list of writers. currently only one, 
    # but I have the option to parallellize it further
    writers = [mysql_inserter.MySqlWriter(mysql_host, mysql_user, mysql_passwd, mysql_schema, converter_result_q
               , columns, 'w_'+mysql_table, 1000) for i in range(1)]

    # starting mapper
    for mapper in mappers:
        mapper.start()
    time.sleep(1)

    # starting converter
    for converter in converters:
        converter.start()

    # starting writer
    for writer in writers:
        writer.start()
Run Code Online (Sandbox Code Playgroud)

[...初始化mongo db连接...]

    # put each dataset read to queue for the mapper
    for row in mongo_collection.find({inc_column: {"$gte": start}}):
        mongo_input_result_q.put(row)
        count += 1
        if count % log_counter == 0:
            print 'Mongo Reader' + " " + str(count)
    print "MongoReader done"

    # Processes are terminated when they read "None" object from queue
    # now that reading is finished, put None for each mapper in the queue so they terminate themselves
    # the same for all followup processes
    for mapper in mappers:
        mongo_input_result_q.put(None)
    for mapper in mappers:
        mapper.join()
    for converter in converters:
        mapper_result_q.put(None)
    for converter in converters:
        converter.join()
    for writer in writers:
        converter_result_q.put(None)
    for writer in writers:
        writer.join()
Run Code Online (Sandbox Code Playgroud)

Kob*_*ohn 29

我不知道标准练习但我发现的是要有可靠的多处理我设计方法/类/等.专门用于多处理.否则你真的不知道另一方面发生了什么(除非我错过了一些机制).

具体我做的是:

  • 子类multiprocessing.Process或make函数,专门支持多处理(包装必要时无法控制的函数)
  • 始终提供multiprocessing.Queue从主进程到每个工作进程的共享错误
  • 将整个运行代码括在一个try: ... except Exception as e.然后当出现意外情况时,发送错误包:
    • 死亡的进程ID
    • 它的原始上下文的例外(请点击此处).如果要在主进程中记录有用信息,原始上下文非常重要.
  • 当然,在工人的正常操作中处理预期的问题是正常的
  • (类似于你已经说过的)假设一个长时间运行的进程,用循环包装正在运行的代码(在try/catch-all中)
    • 在类或函数中定义一个停止标记.
    • 当主进程希望工作人员停止时,只需发送停止令牌即可.阻止所有人,为所有进程发送足够的内容.
    • 包装循环检查输入q是否有令牌或您想要的任何其他输入

最终结果是工作流程可以存活很长时间,并且可以让您知道出现问题时发生了什么.他们会安静地死去,因为你可以在全能异常之后处理你需要做的任何事情,你也会知道什么时候需要重新启动一个工人.

我再次通过反复试验来达到这种模式,所以我不知道它的标准是多少.这有助于满足您的要求吗?

  • 您强调“发送带有原始上下文的异常的错误包”,对我而言,这意味着用作 raise 语句的第三个参数的回溯对象。但是回溯对象是不可选择的,所以它不能通过`multiprocessing.Queue` 对象发送。您如何将上下文返回到父进程? (2认同)

mrk*_*wjc 26

为什么不让Process处理自己的异常,如下所示:

import multiprocessing as mp
import traceback

class Process(mp.Process):
    def __init__(self, *args, **kwargs):
        mp.Process.__init__(self, *args, **kwargs)
        self._pconn, self._cconn = mp.Pipe()
        self._exception = None

    def run(self):
        try:
            mp.Process.run(self)
            self._cconn.send(None)
        except Exception as e:
            tb = traceback.format_exc()
            self._cconn.send((e, tb))
            # raise e  # You can still rise this exception if you need to

    @property
    def exception(self):
        if self._pconn.poll():
            self._exception = self._pconn.recv()
        return self._exception
Run Code Online (Sandbox Code Playgroud)

现在你有了,你的错误和追溯:

def target():
    raise ValueError('Something went wrong...')

p = Process(target = target)
p.start()
p.join()

if p.exception:
    error, traceback = p.exception
    print traceback
Run Code Online (Sandbox Code Playgroud)

此致,马雷克

  • 刚刚经过测试,它可以按预期工作(python 3.7)。只需修改打印语句即可。 (2认同)
  • 如果异常太大(消息和/或堆栈跟踪太长),此代码将死锁。接收端必须定期调用“Pipe.recv()”,否则当内部缓冲区已满时,“Pipe.send()”将阻塞。`join()` 将永远等待子进程退出,而子进程将永远等待父进程执行 `recv()`,这仅在 `join()` 完成后发生。 (2认同)

Tit*_*ter 7

@mrkwjc 的解决方案很简单,很容易理解和实现,但是这个解决方案有一个缺点。当我们有几个进程并且我们想在任何一个进程出错时停止所有进程时,我们需要等到所有进程都完成以检查是否p.exception. 下面是解决这个问题的代码(即当一个孩子有错误时,我们也终止另一个孩子):

import multiprocessing
import traceback

from time import sleep


class Process(multiprocessing.Process):
    """
    Class which returns child Exceptions to Parent.
    /sf/answers/2351997721/
    """

    def __init__(self, *args, **kwargs):
        multiprocessing.Process.__init__(self, *args, **kwargs)
        self._parent_conn, self._child_conn = multiprocessing.Pipe()
        self._exception = None

    def run(self):
        try:
            multiprocessing.Process.run(self)
            self._child_conn.send(None)
        except Exception as e:
            tb = traceback.format_exc()
            self._child_conn.send((e, tb))
            # raise e  # You can still rise this exception if you need to

    @property
    def exception(self):
        if self._parent_conn.poll():
            self._exception = self._parent_conn.recv()
        return self._exception


class Task_1:
    def do_something(self, queue):
        queue.put(dict(users=2))


class Task_2:
    def do_something(self, queue):
        queue.put(dict(users=5))


def main():
    try:
        task_1 = Task_1()
        task_2 = Task_2()

        # Example of multiprocessing which is used:
        # https://eli.thegreenplace.net/2012/01/16/python-parallelizing-cpu-bound-tasks-with-multiprocessing/
        task_1_queue = multiprocessing.Queue()
        task_2_queue = multiprocessing.Queue()

        task_1_process = Process(
            target=task_1.do_something,
            kwargs=dict(queue=task_1_queue))

        task_2_process = Process(
            target=task_2.do_something,
            kwargs=dict(queue=task_2_queue))

        task_1_process.start()
        task_2_process.start()

        while task_1_process.is_alive() or task_2_process.is_alive():
            sleep(10)

            if task_1_process.exception:
                error, task_1_traceback = task_1_process.exception

                # Do not wait until task_2 is finished
                task_2_process.terminate()

                raise ChildProcessError(task_1_traceback)

            if task_2_process.exception:
                error, task_2_traceback = task_2_process.exception

                # Do not wait until task_1 is finished
                task_1_process.terminate()

                raise ChildProcessError(task_2_traceback)

        task_1_process.join()
        task_2_process.join()

        task_1_results = task_1_queue.get()
        task_2_results = task_2_queue.get()

        task_1_users = task_1_results['users']
        task_2_users = task_2_results['users']

    except Exception:
        # Here usually I send email notification with error.
        print('traceback:', traceback.format_exc())


if __name__ == "__main__":
    main()
Run Code Online (Sandbox Code Playgroud)


dru*_*key 6

感谢kobejohn我找到了一个很好而且稳定的解决方案.

  1. 我创建了一个multiprocessing.Process的子类,它实现了一些函数并覆盖了run()方法,将一个新的saferun方法包装到try-catch块中.此类需要一个feedback_queue来初始化,用于将信息,调试,错误消息报告回父级.该类中的日志方法是包的全局定义的日志函数的包装器:

    class EtlStepProcess(multiprocessing.Process):
    
    def __init__(self, feedback_queue):
        multiprocessing.Process.__init__(self)
        self.feedback_queue = feedback_queue
    
    def log_info(self, message):
        log_info(self.feedback_queue, message, self.name)
    
    def log_debug(self, message):
        log_debug(self.feedback_queue, message, self.name)
    
    def log_error(self, err):
        log_error(self.feedback_queue, err, self.name)
    
    def saferun(self):
        """Method to be run in sub-process; can be overridden in sub-class"""
        if self._target:
            self._target(*self._args, **self._kwargs)
    
    def run(self):
        try:
            self.saferun()
        except Exception as e:
            self.log_error(e)
            raise e
        return
    
    Run Code Online (Sandbox Code Playgroud)
  2. 我已经从EtlStepProcess继承了我的所有其他流程步骤.要运行的代码在saferun()方法中实现,而不是运行.这种方式我不必在它周围添加try catch块,因为这已经由run()方法完成.例:

    class MySqlWriter(EtlStepProcess):
    
    def __init__(self, mysql_host, mysql_user, mysql_passwd, mysql_schema, mysql_table, columns, commit_count,
                 input_queue, feedback_queue):
        EtlStepProcess.__init__(self, feedback_queue)
        self.mysql_host = mysql_host
        self.mysql_user = mysql_user
        self.mysql_passwd = mysql_passwd
        self.mysql_schema = mysql_schema
        self.mysql_table = mysql_table
        self.columns = columns
        self.commit_count = commit_count
        self.input_queue = input_queue
    
    def saferun(self):
        self.log_info(self.name + " started")
        #create mysql connection
        engine = sqlalchemy.create_engine('mysql://' + self.mysql_user + ':' + self.mysql_passwd + '@' + self.mysql_host + '/' + self.mysql_schema)
        meta = sqlalchemy.MetaData()
        table = sqlalchemy.Table(self.mysql_table, meta, autoload=True, autoload_with=engine)
        connection = engine.connect()
        try:
            self.log_info("start MySQL insert")
            counter = 0
            row_list = []
            while True:
                next_row = self.input_queue.get()
                if isinstance(next_row, Terminator):
                    if counter % self.commit_count != 0:
                        connection.execute(table.insert(), row_list)
                    # Poison pill means we should exit
                    break
                row_list.append(next_row)
                counter += 1
                if counter % self.commit_count == 0:
                    connection.execute(table.insert(), row_list)
                    del row_list[:]
                    self.log_debug(self.name + ' ' + str(counter))
    
        finally:
            connection.close()
        return
    
    Run Code Online (Sandbox Code Playgroud)
  3. 在我的主文件中,我提交了一个完成所有工作的Process,并给它一个feedback_queue.此过程启动所有步骤,然后从mongoDB读取并将值放入初始队列.我的主进程侦听反馈队列并打印所有日志消息.如果它收到一个错误日志,它会打印错误并终止它的子节点,而子节点在死亡之前也会终止它的所有子节点.

    if __name__ == '__main__':
    feedback_q = multiprocessing.Queue()
    p = multiprocessing.Process(target=mongo_python_export, args=(feedback_q,))
    p.start()
    
    while p.is_alive():
        fb = feedback_q.get()
        if fb["type"] == "error":
            p.terminate()
            print "ERROR in " + fb["process"] + "\n"
            for child in multiprocessing.active_children():
                child.terminate()
        else:
            print datetime.datetime.fromtimestamp(fb["timestamp"]).strftime('%Y-%m-%d %H:%M:%S') + " " + \
                                                  fb["process"] + ": " + fb["message"]
    
    p.join()
    
    Run Code Online (Sandbox Code Playgroud)

我考虑用它制作一个模块并将它放在github上,但我必须先做一些清理和评论.