Mic*_*oni 5 python json dictionary google-cloud-dataflow apache-beam
我正在尝试使用Beam管道,以便将SequenceMatcher函数应用于大量单词.我(希望)除了WriteToText部分之外已经找到了所有的东西.
我已经定义了一个自定义ParDo(在此称为ProcessDataDoFn),它接受main_input和side_input,处理它们并输出像这样的字典
{u'key': (u'string', float)}
Run Code Online (Sandbox Code Playgroud)
我的管道非常简单
class ProcessDataDoFn(beam.DoFn):
def process(self, element, side_input):
... Series of operations ...
return output_dictionary
with beam.Pipeline(options=options) as p:
# Main input
main_input = p | 'ReadMainInput' >> beam.io.Read(
beam.io.BigQuerySource(
query=CUSTOM_SQL,
use_standard_sql=True
))
# Side input
side_input = p | 'ReadSideInput' >> beam.io.Read(
beam.io.BigQuerySource(
project=PROJECT_ID,
dataset=DATASET,
table=TABLE
))
output = (
main_input
| 'ProcessData' >> beam.ParDo(
ProcessDataDoFn(),
side_input=beam.pvalue.AsList(side_input))
| 'WriteOutput' >> beam.io.WriteToText(GCS_BUCKET)
)
Run Code Online (Sandbox Code Playgroud)
现在的问题是,如果我像这样离开管道,它只输出output_dictionary的键.如果我将ProcessDataDoFn的返回值更改为json.dumps(ouput_dictionary),则Json写得正确但是像这样
{
'
k
e
y
'
:
[
'
s
t
r
i
n
g
'
,
f
l
o
a
t
]
Run Code Online (Sandbox Code Playgroud)
如何正确输出结果?
您的输出看起来很不寻常。json.dumps应该在一行中打印 json,并且应该逐行输出到文件中。
也许为了拥有更清晰的代码,您可以添加一个额外的地图操作,以您需要的方式进行格式化。像这样:
output = (
main_input
| 'ProcessData' >> beam.ParDo(
ProcessDataDoFn(),
side_input=beam.pvalue.AsList(side_input))
| 'FormatOutput' >> beam.Map(json.dumps)
| 'WriteOutput' >> beam.io.WriteToText(GCS_BUCKET)
)
Run Code Online (Sandbox Code Playgroud)
我实际上部分解决了这个问题。
我编写的 ParDoFn 返回字典或 JSON 格式的字符串。在这两种情况下,当 Beam 尝试对所述输入执行某些操作时,就会出现问题。如果说 PCollection 是字典,Beam 似乎迭代给定的 PCollection,它只获取它的键,如果说 PCollection 是一个字符串,它迭代所有字符(这就是 JSON 输出如此奇怪的原因)。我发现解决方案相当简单:将字典或字符串封装在列表中。JSON 格式化部分可以在 ParDoFn 级别完成,也可以通过像您展示的那样的转换完成。