Naa*_*ary 4 python google-cloud-platform google-cloud-dataflow apache-beam
我正在尝试通过 python 中的 apache beam 读取 JSON 文件,并对其应用一些数据质量规则。目前我正在使用 beam.io.ReadFromText 读取每个 json 行并使用一些函数来修改数据。读取 JSON 数据并修改它们的更好方法是什么?
(p
| 'Getdata' >> beam.io.ReadFromText(input)
| 'filter_name' >> beam.FlatMap(lambda line: dq_name(line))
| 'filter_phone' >> beam.FlatMap(lambda line: dq_phone(line))
| 'filter_zip' >> beam.FlatMap(lambda line: dq_zip(line))
| 'filter_address' >> beam.FlatMap(lambda line: dq_city(line))
| 'filter_website' >> beam.FlatMap(lambda line: dq_website(line))
| 'write' >> beam.io.WriteToText(output_prefix) )
Run Code Online (Sandbox Code Playgroud)
注意:我对此还很陌生,如果我当前的方法看起来太粗制滥造,我很抱歉。
您正在从错误的方向接近 Apache Beam(数据流)。
您正在尝试读取一行,然后一次对这一行应用转换。
相反,您需要将 Beam 视为并行处理器。您将读入所有行ReadFromText(),然后并行地将变换应用于每行。
查看函数beam.ParDo()。这将允许您创建一个可以处理 JSON 文件每一行的类。然后,您的代码将包含诸如ReadFromText(), ParDo(MyJsonProcessor()), 之类的主要步骤WriteToText()。
请记住,您的 JSON 需要是换行符分隔的 JSON。http://ndjson.org/