相关疑难解决方法(0)

Python多处理:处理父级中的子错误

我目前正在玩多处理和队列.我编写了一段代码来从mongoDB导出数据,将其映射到关系(平面)结构,将所有值转换为字符串并将它们插入到mysql中.

这些步骤中的每一步都作为一个进程提交并给出导入/导出队列,对于在父进程中处理的mongoDB导出是安全的.

正如您将在下面看到的,我使用队列和子进程在从队列中读取"None"时自行终止.我目前遇到的问题是,如果子进程遇到未处理的异常,则父进程无法识别,其余只是保持运行.我想要发生的是整个shebang退出并且最多再加上孩子的错误.

我有两个问题:

  1. 如何检测父级中的子错误?
  2. 检测到错误后,如何杀死我的子进程(最佳实践)?我意识到将"无"放入队列以杀死孩子非常脏.

我正在使用python 2.7.

以下是我的代码的基本部分:

# Establish communication queues
mongo_input_result_q = multiprocessing.Queue()
mapper_result_q = multiprocessing.Queue()
converter_result_q = multiprocessing.Queue()
Run Code Online (Sandbox Code Playgroud)

[...]

    # create child processes
    # all processes generated here are subclasses of "multiprocessing.Process"

    # create mapper
    mappers = [mongo_relational_mapper.MongoRelationalMapper(mongo_input_result_q, mapper_result_q, columns, 1000)
               for i in range(10)]

    # create datatype converter, converts everything to str
    converters = [datatype_converter.DatatypeConverter(mapper_result_q, converter_result_q, 'str', 1000)
                  for i in range(10)]

    # create mysql writer
    # I create a list of writers. currently only …
Run Code Online (Sandbox Code Playgroud)

python error-handling multiprocessing

39
推荐指数
4
解决办法
4万
查看次数

Python:concurrent.futures如何使其可取消?

Python concurrent.futures和ProcessPoolExecutor提供了一个简洁的界面来安排和监视任务.期货甚至提供 .cancel()方法:

cancel():尝试取消通话.如果当前正在执行调用且无法取消,则该方法将返回False,否则将取消调用并且该方法将返回True.

不幸的是,在一个类似的问题(关于asyncio)中,回答声称运行任务是不可取消的,使用这个剪切的文档,但文档不要说,只有它们运行和不可解决.

向进程提交multiprocessing.Events也是不可能的(通过参数执行此操作,如在multiprocess.Process中返回RuntimeError)

我想做什么?我想分区搜索空间并为每个分区运行任务.但它足以拥有一个解决方案,而且这个过程是CPU密集型的.那么有一种实际的舒适方式可以通过使用ProcessPool开始来抵消收益吗?

例:

from concurrent.futures import ProcessPoolExecutor, FIRST_COMPLETED, wait

# function that profits from partitioned search space
def m_run(partition):
    for elem in partition:
        if elem == 135135515:
            return elem
    return False

futures = []
# used to create the partitions
steps = 100000000
with ProcessPoolExecutor(max_workers=4) as pool:
    for i in range(4):
        # run 4 tasks with a partition, but only *one* solution is needed
        partition = range(i*steps,(i+1)*steps) …
Run Code Online (Sandbox Code Playgroud)

python multiprocess concurrent.futures

10
推荐指数
3
解决办法
5818
查看次数