我目前正在玩多处理和队列.我编写了一段代码来从mongoDB导出数据,将其映射到关系(平面)结构,将所有值转换为字符串并将它们插入到mysql中.
这些步骤中的每一步都作为一个进程提交并给出导入/导出队列,对于在父进程中处理的mongoDB导出是安全的.
正如您将在下面看到的,我使用队列和子进程在从队列中读取"None"时自行终止.我目前遇到的问题是,如果子进程遇到未处理的异常,则父进程无法识别,其余只是保持运行.我想要发生的是整个shebang退出并且最多再加上孩子的错误.
我有两个问题:
我正在使用python 2.7.
以下是我的代码的基本部分:
# Establish communication queues
mongo_input_result_q = multiprocessing.Queue()
mapper_result_q = multiprocessing.Queue()
converter_result_q = multiprocessing.Queue()
Run Code Online (Sandbox Code Playgroud)
[...]
# create child processes
# all processes generated here are subclasses of "multiprocessing.Process"
# create mapper
mappers = [mongo_relational_mapper.MongoRelationalMapper(mongo_input_result_q, mapper_result_q, columns, 1000)
for i in range(10)]
# create datatype converter, converts everything to str
converters = [datatype_converter.DatatypeConverter(mapper_result_q, converter_result_q, 'str', 1000)
for i in range(10)]
# create mysql writer
# I create a list of writers. currently only …Run Code Online (Sandbox Code Playgroud) Python concurrent.futures和ProcessPoolExecutor提供了一个简洁的界面来安排和监视任务.期货甚至提供 .cancel()方法:
cancel():尝试取消通话.如果当前正在执行调用且无法取消,则该方法将返回False,否则将取消调用并且该方法将返回True.
不幸的是,在一个类似的问题(关于asyncio)中,回答声称运行任务是不可取消的,使用这个剪切的文档,但文档不要说,只有它们运行和不可解决.
向进程提交multiprocessing.Events也是不可能的(通过参数执行此操作,如在multiprocess.Process中返回RuntimeError)
我想做什么?我想分区搜索空间并为每个分区运行任务.但它足以拥有一个解决方案,而且这个过程是CPU密集型的.那么有一种实际的舒适方式可以通过使用ProcessPool开始来抵消收益吗?
例:
from concurrent.futures import ProcessPoolExecutor, FIRST_COMPLETED, wait
# function that profits from partitioned search space
def m_run(partition):
for elem in partition:
if elem == 135135515:
return elem
return False
futures = []
# used to create the partitions
steps = 100000000
with ProcessPoolExecutor(max_workers=4) as pool:
for i in range(4):
# run 4 tasks with a partition, but only *one* solution is needed
partition = range(i*steps,(i+1)*steps) …Run Code Online (Sandbox Code Playgroud)