需要一些帮助。我有一些从 Pub/Sub 读取并写入 GCS 中的批处理文件的琐碎任务,但是在使用 fileio.WriteToFiles 时遇到了一些困难
with beam.Pipeline(options=pipeline_options) as p:
input = (p | 'ReadData' >> beam.io.ReadFromPubSub(topic=known_args.input_topic).with_output_types(bytes)
| "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
| 'Parse' >> beam.Map(parse_json)
| ' data w' >> beam.WindowInto(
FixedWindows(60),
accumulation_mode=AccumulationMode.DISCARDING
))
event_data = (input
| 'filter events' >> beam.Filter(lambda x: x['t'] == 'event')
| 'encode et' >> beam.Map(lambda x: json.dumps(x))
| 'write events to file' >> fileio.WriteToFiles(
path='gs://extention/ga_analytics/events/', shards=0))
Run Code Online (Sandbox Code Playgroud)
窗口触发后我需要一个文件,但文件数等于来自 Pubsub 的消息数,有人可以帮助我吗? 当前输出文件, 但我只需要一个文件。