我目前正在玩多处理和队列.我编写了一段代码来从mongoDB导出数据,将其映射到关系(平面)结构,将所有值转换为字符串并将它们插入到mysql中.
这些步骤中的每一步都作为一个进程提交并给出导入/导出队列,对于在父进程中处理的mongoDB导出是安全的.
正如您将在下面看到的,我使用队列和子进程在从队列中读取"None"时自行终止.我目前遇到的问题是,如果子进程遇到未处理的异常,则父进程无法识别,其余只是保持运行.我想要发生的是整个shebang退出并且最多再加上孩子的错误.
我有两个问题:
我正在使用python 2.7.
以下是我的代码的基本部分:
# Establish communication queues
mongo_input_result_q = multiprocessing.Queue()
mapper_result_q = multiprocessing.Queue()
converter_result_q = multiprocessing.Queue()
Run Code Online (Sandbox Code Playgroud)
[...]
# create child processes
# all processes generated here are subclasses of "multiprocessing.Process"
# create mapper
mappers = [mongo_relational_mapper.MongoRelationalMapper(mongo_input_result_q, mapper_result_q, columns, 1000)
for i in range(10)]
# create datatype converter, converts everything to str
converters = [datatype_converter.DatatypeConverter(mapper_result_q, converter_result_q, 'str', 1000)
for i in range(10)]
# create mysql writer
# I create a list of writers. currently only …Run Code Online (Sandbox Code Playgroud) 我们正在将csv文件加载到BigQuery中.每个文件都将创建一个单独的表.
当我们从这些表中选择时,我们主要使用表查询liek这样做:
SELECT foo, bar
FROM TABLE_QUERY(name_stub,'table_id CONTAINS "_something" and msec_to_timestamp(creation_time) > date_add(current_timestamp(), -90, "day")'));
Run Code Online (Sandbox Code Playgroud)
现在我们已经为新文件添加了新字段.因此,我们现在还有"baz",而不仅仅是字段"foo"和"bar".
当我运行以下查询时,我得到错误,其中一个较旧的表上不存在字段"baz".
SELECT foo, bar, baz
FROM TABLE_QUERY(name_stub,'table_id CONTAINS "_something" and msec_to_timestamp(creation_time) > date_add(current_timestamp(), -90, "day")'));
Run Code Online (Sandbox Code Playgroud)
有没有办法选择"baz"并且只有没有列的表的默认值?