使用 ndjson 格式的文本文件,以下代码产生了我所期望的。一个 ndjson 文件,其中quotes.USD dict 未嵌套并删除了原始quotes 元素。
def unnest_quotes(element):
element['USDquotes'] = element['quotes']['USD']
del element['quotes']
return element
p = beam.Pipeline(options=pipeline_options)
ReadJson = p | ReadFromText(known_args.input,coder=JsonCoder())
MapFormattedJson = ReadJson | 'Map Function' >> beam.Map(unnest_quotes)
MapFormattedJson | 'Write Map Output' >> WriteToText(known_args.output,coder=JsonCoder())
Run Code Online (Sandbox Code Playgroud)
但是,当我尝试使用 ParDo 实现相同的目标时,我不理解这种行为。
class UnnestQuotes(beam.DoFn):
def process(self,element):
element['USDquotes'] = element['quotes']['USD']
del element['quotes']
return element
p = beam.Pipeline(options=pipeline_options)
ReadJson = p | ReadFromText(known_args.input,coder=JsonCoder())
ClassFormattedJson = ReadJson | 'Pardo' >> beam.ParDo(UnnestQuotes())
ClassFormattedJson | 'Write Class Output' >> WriteToText(known_args.output,coder=JsonCoder())
Run Code Online (Sandbox Code Playgroud)
这将生成一个文件,其中 dict 的每个键位于单独的行上,没有值,如下所示。
"last_updated" …Run Code Online (Sandbox Code Playgroud)