tot*_*ooo 2 python google-cloud-dataflow apache-beam
我的几个 PCollections(来自不同来源)必须以相同的方式解码。
hits = (msgs | 'Parse' >> beam.Map(parse)
| 'Decode' >> beam.Map(decode_hit))
Run Code Online (Sandbox Code Playgroud)
然后:
dummy_hits = (dummy_msgs | 'Parse' >> beam.Map(parse)
| 'Decode' >> beam.Map(decode_hit))
Run Code Online (Sandbox Code Playgroud)
多亏了我之前给它们的名字,如果我可以重用这些转换,那就太好了。我天真地尝试了这个:
dummy_hits = (dummy_msgs | 'Parse'
| 'Decode')
Run Code Online (Sandbox Code Playgroud)
但我的管道不会建立。(类型错误:需要一个 PTransform 对象,得到解析)。
我认为有可能作为管道模块的文档说明:“如果需要应用相同的转换实例,则应使用右移运算符来指定新名称(例如input | "label" >> my_tranform)”
这样做的方法是什么?只有这可能吗?
名称必须是唯一的,但由于您的步骤顺序相同,因此您可能想要创建这样的复合变换
https://beam.apache.org/get-started/wordcount-example/#creating-composite-transforms
所以这样做:
class ParseDecode(beam.PTransform):
def expand(self, pcoll):
return (pcoll
| 'Parse' >> beam.Map(parse)
| 'Decode' >> beam.Map(decode_hit))
Run Code Online (Sandbox Code Playgroud)
这样你就可以做到这一点:
hits = (msgs | 'Parse msgs' >> ParseDecode()
Run Code Online (Sandbox Code Playgroud)
然后这个:
dummy_hits = (dummy_msgs | 'Parse dummy msgs' >> ParseDecode()
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1099 次 |
| 最近记录: |