小编Hul*_*oof的帖子

慢慢改变 BigQuery 的查找缓存 - Dataflow Python Streaming SDK

我正在尝试遵循缓慢变化的查找缓存的设计模式(https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1)用于在 DataFlow 上使用适用于 Apache Beam 的 Python SDK 的流式传输管道。

\n\n

我们的查找缓存参考表位于 BigQuery 中,我们能够读取它并将其作为 ParDo 操作的侧输入传递,但无论我们如何设置触发器/窗口,它都不会刷新。

\n\n
class FilterAlertDoFn(beam.DoFn):\n  def process(self, element, alertlist):\n\n    print len(alertlist)\n    print alertlist\n\n    \xe2\x80\xa6  # function logic\n
Run Code Online (Sandbox Code Playgroud)\n\n
\n\n
alert_input = (p | beam.io.Read(beam.io.BigQuerySource(query=ALERT_QUERY))\n                        | \xe2\x80\x98alert_side_input\xe2\x80\x99 >> beam.WindowInto(\n                            beam.window.GlobalWindows(),\n                            trigger=trigger.RepeatedlyTrigger(trigger.AfterWatermark(\n                                late=trigger.AfterCount(1)\n                            )),\n                            accumulation_mode=trigger.AccumulationMode.ACCUMULATING\n                          )\n                       | beam.Map(lambda elem: elem[\xe2\x80\x98SOMEKEY\xe2\x80\x99])\n)\n\n...\n\n\nmain_input | \xe2\x80\x98alerts\xe2\x80\x99 >> beam.ParDo(FilterAlertDoFn(), beam.pvalue.AsList(alert_input))\n
Run Code Online (Sandbox Code Playgroud)\n\n
\n\n

基于此处的 I/O 页面 ( https://beam.apache.org/documentation/io/built-in/ ),它表示 Python SDK 仅支持 BigQuery Sink 的流式传输,这是否意味着 BQ 读取是有界源那么在这个方法中能不能刷新\xe2\x80\x99呢?

\n\n

尝试在源上设置非全局窗口会导致侧输入中出现空 PCollection。

\n\n
\n\n

更新 …

python streaming dataflow apache-beam

5
推荐指数
1
解决办法
1222
查看次数

标签 统计

apache-beam ×1

dataflow ×1

python ×1

streaming ×1