如何在 Python 中使用 Apache Beam 读取和操作 Json 文件

Rim*_*Rim 1 python google-cloud-platform google-cloud-dataflow apache-beam

我有一个 .txt 文件,它具有 JSON 格式。我想读取、操作和重组文件(更改字段名称...) 如何使用 Apache Beam 在 Python 中执行此操作?

小智 7

为了能够在 Python 上使用 Apache Beam 读取 Json 文件,您可以创建一个自定义编码器:

CF:https : //beam.apache.org/documentation/programming-guide/#specifying-coders

class JsonCoder(object):
"""A JSON coder interpreting each line as a JSON string."""

def encode(self, x):
    return json.dumps(x)

def decode(self, x):
    return json.loads(x)
Run Code Online (Sandbox Code Playgroud)

然后您必须在读取或写入数据时指定它,例如:

lines = p | 'read_data' >> ReadFromText(known_args.input, coder=JsonCoder())
Run Code Online (Sandbox Code Playgroud)

最好的问候,工作顺利;)