如何通过python读取apache beam(数据流)中的JSON文件?

Naa*_*ary 4 python google-cloud-platform google-cloud-dataflow apache-beam

我正在尝试通过 python 中的 apache beam 读取 JSON 文件,并对其应用一些数据质量规则。目前我正在使用 beam.io.ReadFromText 读取每个 json 行并使用一些函数来修改数据。读取 JSON 数据并修改它们的更好方法是什么?

(p
  | 'Getdata' >> beam.io.ReadFromText(input)
  | 'filter_name' >> beam.FlatMap(lambda line: dq_name(line))
  | 'filter_phone' >> beam.FlatMap(lambda line: dq_phone(line))
  | 'filter_zip' >> beam.FlatMap(lambda line: dq_zip(line))
  | 'filter_address' >> beam.FlatMap(lambda line: dq_city(line))
  | 'filter_website' >> beam.FlatMap(lambda line: dq_website(line))
  | 'write' >> beam.io.WriteToText(output_prefix)  )
Run Code Online (Sandbox Code Playgroud)

注意:我对此还很陌生,如果我当前的方法看起来太粗制滥造,我很抱歉。

Joh*_*ley 6

您正在从错误的方向接近 Apache Beam(数据流)。

您正在尝试读取一行,然后一次对这一行应用转换。

相反,您需要将 Beam 视为并行处理器。您将读入所有行ReadFromText(),然后并行地将变换应用于每行。

查看函数beam.ParDo()。这将允许您创建一个可以处理 JSON 文件每一行的类。然后,您的代码将包含诸如ReadFromText(), ParDo(MyJsonProcessor()), 之类的主要步骤WriteToText()

请记住,您的 JSON 需要是换行符分隔的 JSON。http://ndjson.org/