我正在尝试完成以下操作: Beam / Dataflow中的批处理PCollection
上面链接中的答案是Java,而我使用的语言是Python。因此,我需要一些帮助来获得类似的构造。
具体来说我有这个:
p = beam.Pipeline (options = pipeline_options)
lines = p | 'File reading' >> ReadFromText (known_args.input)
Run Code Online (Sandbox Code Playgroud)
此后,由于我的用例需要一组行,因此我需要创建另一个,PCollection但要包含ListN行的“行”。我无法逐行操作。
我尝试了一个ParDo函数,该函数使用变量将计数与N行计数器相关联,并在groupBy使用之后Map。但是这些每1000条记录会重置一次,所以这不是我想要的解决方案。我阅读了链接中的示例,但我不知道如何在Python中执行类似的操作。
我尝试将计数器保存在数据存储区中,但是,使用数据存储区读写数据流之间的速度差异非常明显。
正确的方法是什么?我不知道该怎么办。问候。