G B*_*G B 4 google-cloud-dataflow
我想知道为什么这个#sideInput()方法移到了ProcessContext课堂?以前我可以在#startBundle()方法中做一些额外的处理并缓存结果.这样做#processElement()听起来效率低下.当然,我可以在将数据传递给视图之前进行预处理,但是仍然需要调用#sideInput()每个元素的开销......
谢谢,G
好问题.原因是我们添加了对窗口PCollections的支持作为侧输入.这可以实现其他方案,包括在流模式下使用具有无界PCollections的侧输入.
在更改之前,我们仅支持全局窗口化的侧输入,然后在处理主输入PCollection的每个元素时可以使用整个侧输入PCollection.这适用于传统批处理样式处理中的有界PCollections,但没有扩展到窗口或无界PCollections.
更改后,您在ParDo中处理的当前元素的窗口控制侧输入的哪个子集可见.(因此,您无法访问startBundle()中的侧输入,其中没有当前元素,因此没有当前窗口.)
例如,考虑一个示例,其中您有一个流管道处理您的网站日志并提供实时使用仪表板的实时更新.您有两个无限输入PCollections:一个包含新用户注册,另一个包含用户点击.您可以通过按小时对PCollections进行窗口化并通过用户点击进行ParDo来识别哪些用户点击来自新用户,这些点击将新用户注册作为侧输入.现在,当您处理给定小时内的用户点击时,您会自动查看同一小时内新用户注册的子集.您可以通过更改窗口功能并在侧输入上及时向前移动元素时间戳来对此进行不同的变体 - 例如继续按窗口显示每小时的用户点击次数,但使用过去24小时内的新注册.
我确实同意这种更改使得在您的侧输入上缓存任何后处理变得更加困难.我们添加了View.asMultimap来处理将Iterable转换为查找表的常见情况.如果您的后处理是按元素进行的,则可以在创建PCollectionView之前使用ParDo执行此操作.对于其他任何事情,我建议从processElement中懒散地做.我有兴趣了解发生的其他模式,因此我们可以研究如何提高它们的效率.
| 归档时间: |
|
| 查看次数: |
379 次 |
| 最近记录: |