我正在尝试遵循缓慢变化的查找缓存的设计模式(https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1)用于在 DataFlow 上使用适用于 Apache Beam 的 Python SDK 的流式传输管道。
\n\n我们的查找缓存参考表位于 BigQuery 中,我们能够读取它并将其作为 ParDo 操作的侧输入传递,但无论我们如何设置触发器/窗口,它都不会刷新。
\n\nclass FilterAlertDoFn(beam.DoFn):\n def process(self, element, alertlist):\n\n print len(alertlist)\n print alertlist\n\n \xe2\x80\xa6 # function logic\nRun Code Online (Sandbox Code Playgroud)\n\nalert_input = (p | beam.io.Read(beam.io.BigQuerySource(query=ALERT_QUERY))\n | \xe2\x80\x98alert_side_input\xe2\x80\x99 >> beam.WindowInto(\n beam.window.GlobalWindows(),\n trigger=trigger.RepeatedlyTrigger(trigger.AfterWatermark(\n late=trigger.AfterCount(1)\n )),\n accumulation_mode=trigger.AccumulationMode.ACCUMULATING\n )\n | beam.Map(lambda elem: elem[\xe2\x80\x98SOMEKEY\xe2\x80\x99])\n)\n\n...\n\n\nmain_input | \xe2\x80\x98alerts\xe2\x80\x99 >> beam.ParDo(FilterAlertDoFn(), beam.pvalue.AsList(alert_input))\nRun Code Online (Sandbox Code Playgroud)\n\n基于此处的 I/O 页面 ( https://beam.apache.org/documentation/io/built-in/ ),它表示 Python SDK 仅支持 BigQuery Sink 的流式传输,这是否意味着 BQ 读取是有界源那么在这个方法中能不能刷新\xe2\x80\x99呢?
\n\n尝试在源上设置非全局窗口会导致侧输入中出现空 PCollection。
\n\n更新 …