小编And*_*nko的帖子

使用 Apache Beam Python `WriteToFiles` 转换每个窗口只写一个文件

需要一些帮助。我有一些从 Pub/Sub 读取并写入 GCS 中的批处理文件的琐碎任务,但是在使用 fileio.WriteToFiles 时遇到了一些困难

with beam.Pipeline(options=pipeline_options) as p:
  input = (p | 'ReadData' >> beam.io.ReadFromPubSub(topic=known_args.input_topic).with_output_types(bytes)
             | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
             | 'Parse' >> beam.Map(parse_json)
             | ' data w' >> beam.WindowInto(
                 FixedWindows(60),
                 accumulation_mode=AccumulationMode.DISCARDING
             ))

  event_data = (input
             | 'filter events' >> beam.Filter(lambda x: x['t'] == 'event')
             | 'encode et' >> beam.Map(lambda x: json.dumps(x))
             | 'write events to file' >> fileio.WriteToFiles(
                    path='gs://extention/ga_analytics/events/', shards=0))
Run Code Online (Sandbox Code Playgroud)

窗口触发后我需要一个文件,但文件数等于来自 Pubsub 的消息数,有人可以帮助我吗? 当前输出文件, 但我只需要一个文件。

python google-cloud-dataflow apache-beam

1
推荐指数
1
解决办法
613
查看次数