ParDo 行为的 Apache Beam 解释

Thi*_*nja 3 python google-cloud-dataflow apache-beam

使用 ndjson 格式的文本文件,以下代码产生了我所期望的。一个 ndjson 文件,其中quotes.USD dict 未嵌套并删除了原始quotes 元素。

  def unnest_quotes(element):
      element['USDquotes'] = element['quotes']['USD']
      del element['quotes']
      return element

  p = beam.Pipeline(options=pipeline_options)
  ReadJson = p | ReadFromText(known_args.input,coder=JsonCoder())
  MapFormattedJson = ReadJson | 'Map Function' >> beam.Map(unnest_quotes)
  MapFormattedJson | 'Write Map Output' >> WriteToText(known_args.output,coder=JsonCoder())
Run Code Online (Sandbox Code Playgroud)

但是,当我尝试使用 ParDo 实现相同的目标时,我不理解这种行为。

  class UnnestQuotes(beam.DoFn):
    def process(self,element):
      element['USDquotes'] = element['quotes']['USD']
      del element['quotes']
      return element

  p = beam.Pipeline(options=pipeline_options)
  ReadJson = p | ReadFromText(known_args.input,coder=JsonCoder())
  ClassFormattedJson = ReadJson | 'Pardo' >> beam.ParDo(UnnestQuotes())
  ClassFormattedJson | 'Write Class Output' >> WriteToText(known_args.output,coder=JsonCoder())
Run Code Online (Sandbox Code Playgroud)

这将生成一个文件,其中 dict 的每个键位于单独的行上,没有值,如下所示。

"last_updated"
"name"
"symbol"
"rank"
"total_supply"
"max_supply"
"circulating_supply"
"website_slug"
"id"
"USDquotes"
Run Code Online (Sandbox Code Playgroud)

就好像 Map 函数生成的 PCollection 是完整的字典,而 Pardo 生成每个键的 PCollection。

我知道我只能使用 map 函数,但我需要了解这种行为,以便将来我确实需要使用 ParDo 时。

Thi*_*nja 5

我在这个答案的帮助下想通了这一点。 apache 光束平面图与地图

因为我所经历的与 FlatMap 和 Map 之间的区别相同。为了获得所需的行为,我所需要做的就是将 Pardo 的返回值包装在一个列表中。

  class UnnestQuotes(beam.DoFn):
    def process(self,element):
      element['USDquotes'] = element['quotes']['USD']
      del element['quotes']
      return [element]
Run Code Online (Sandbox Code Playgroud)