慢慢改变 BigQuery 的查找缓存 - Dataflow Python Streaming SDK

Hul*_*oof 5 python streaming dataflow apache-beam

我正在尝试遵循缓慢变化的查找缓存的设计模式(https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1)用于在 DataFlow 上使用适用于 Apache Beam 的 Python SDK 的流式传输管道。

\n\n

我们的查找缓存参考表位于 BigQuery 中,我们能够读取它并将其作为 ParDo 操作的侧输入传递,但无论我们如何设置触发器/窗口,它都不会刷新。

\n\n
class FilterAlertDoFn(beam.DoFn):\n  def process(self, element, alertlist):\n\n    print len(alertlist)\n    print alertlist\n\n    \xe2\x80\xa6  # function logic\n
Run Code Online (Sandbox Code Playgroud)\n\n
\n\n
alert_input = (p | beam.io.Read(beam.io.BigQuerySource(query=ALERT_QUERY))\n                        | \xe2\x80\x98alert_side_input\xe2\x80\x99 >> beam.WindowInto(\n                            beam.window.GlobalWindows(),\n                            trigger=trigger.RepeatedlyTrigger(trigger.AfterWatermark(\n                                late=trigger.AfterCount(1)\n                            )),\n                            accumulation_mode=trigger.AccumulationMode.ACCUMULATING\n                          )\n                       | beam.Map(lambda elem: elem[\xe2\x80\x98SOMEKEY\xe2\x80\x99])\n)\n\n...\n\n\nmain_input | \xe2\x80\x98alerts\xe2\x80\x99 >> beam.ParDo(FilterAlertDoFn(), beam.pvalue.AsList(alert_input))\n
Run Code Online (Sandbox Code Playgroud)\n\n
\n\n

基于此处的 I/O 页面 ( https://beam.apache.org/documentation/io/built-in/ ),它表示 Python SDK 仅支持 BigQuery Sink 的流式传输,这是否意味着 BQ 读取是有界源那么在这个方法中能不能刷新\xe2\x80\x99呢?

\n\n

尝试在源上设置非全局窗口会导致侧输入中出现空 PCollection。

\n\n
\n\n

更新:\n当尝试实施 Pablo\ 的答案建议的策略时,使用侧面输入的 ParDo 操作将无法运行。

\n\n

有一个输入源连接到两个输出,其中一个使用侧输入。Non-SideInput 仍将到达其目的地,并且 SideInput 管道不会进入 FilterAlertDoFn()。

\n\n

通过用虚拟值替换侧面输入,管道将进入该函数。它是否可能正在等待一个不存在的合适窗口?

\n\n

使用与上面相同的 FilterAlertDoFn() ,我的 side_input 和调用现在看起来像这样:

\n\n
def refresh_side_input(_):\n   query = \'select col from table\'\n   client = bigquery.Client(project=\'gcp-project\')\n   query_job = client.query(query)\n\n   return query_job.result()\n\n\ntrigger_input = ( p | \'alert_ref_trigger\' >> beam.io.ReadFromPubSub(\n            subscription=known_args.trigger_subscription))\n\n\nbigquery_side_input = beam.pvalue.AsSingleton((trigger_input\n         | beam.WindowInto(beam.window.GlobalWindows(),\n                           trigger=trigger.Repeatedly(trigger.AfterCount(1)),\n                           accumulation_mode=trigger.AccumulationMode.DISCARDING)\n         | beam.Map(refresh_side_input)\n        ))\n\n...\n\n# Passing this as side input doesn\'t work\nmain_input | \'alerts\' >> beam.ParDo(FilterAlertDoFn(), bigquery_side_input)\n\n# Passing dummy variable as side input does work\nmain_input | \'alerts\' >> beam.ParDo(FilterAlertDoFn(), [1])\n
Run Code Online (Sandbox Code Playgroud)\n\n

我尝试了几个不同版本的refresh_side_input(),它们在检查函数内部的返回时报告预期结果。

\n\n
\n\n

更新2:

\n\n

我对 Pablo 的代码做了一些小的修改,并且得到了相同的行为 - DoFn 永远不会执行。

\n\n

在下面的示例中,每当我发布到some_other_topic时,我都会看到“in_load_conversion_data”,但在发布到some_topic时永远不会看到“in_DoFn”

\n\n
import apache_beam as beam\nimport apache_beam.transforms.window as window\n\nfrom apache_beam.transforms import trigger\nfrom apache_beam.options.pipeline_options import PipelineOptions\nfrom apache_beam.options.pipeline_options import SetupOptions\nfrom apache_beam.options.pipeline_options import StandardOptions\n\n\ndef load_my_conversion_data():\n    return {\'EURUSD\': 1.1, \'USDMXN\': 4.4}\n\n\ndef load_conversion_data(_):\n    # I will suppose that these are currency conversions. E.g.\n    # {\'EURUSD\': 1.1, \'USDMXN\' 20,}\n    print \'in_load_conversion_data\'\n    return load_my_conversion_data()\n\n\nclass ConvertTo(beam.DoFn):\n    def __init__(self, target_currency):\n        self.target_currency = target_currency\n\n    def process(self, elm, rates):\n        print \'in_DoFn\'\n        elm = elm.attributes\n        if elm[\'currency\'] == self.target_currency:\n            yield elm\n        elif \' % s % s\' % (elm[\'currency\'], self.target_currency) in rates:\n            rate = rates[\' % s % s\' % (elm[\'currency\'], self.target_currency)]\n            result = {}.update(elm).update({\'currency\': self.target_currency,\n            \'value\': elm[\'value\']*rate})\n             yield result\n         else:\n             return  # We drop that value\n\n\npipeline_options = PipelineOptions()\npipeline_options.view_as(StandardOptions).streaming = True\np = beam.Pipeline(options=pipeline_options)\n\nsome_topic = \'projects/some_project/topics/some_topic\'\nsome_other_topic = \'projects/some_project/topics/some_other_topic\'\n\nwith beam.Pipeline(options=pipeline_options) as p:\n\n    table_pcv = beam.pvalue.AsSingleton((\n      p\n      | \'some_other_topic\' >>  beam.io.ReadFromPubSub(topic=some_other_topic,  with_attributes=True)\n      | \'some_other_window\' >> beam.WindowInto(window.GlobalWindows(),\n                        trigger=trigger.Repeatedly(trigger.AfterCount(1)),\n                        accumulation_mode=trigger.AccumulationMode.DISCARDING)\n      | beam.Map(load_conversion_data)))\n\n\n    _ = (p | \'some_topic\' >> beam.io.ReadFromPubSub(topic=some_topic)\n         | \'some_window\' >> beam.WindowInto(window.FixedWindows(1))\n         | beam.ParDo(ConvertTo(\'USD\'), rates=table_pcv))\n
Run Code Online (Sandbox Code Playgroud)\n

Pab*_*blo 0

正如您所指出的,Java SDK 允许您使用更多流实用程序,例如计时器和状态。这些实用程序有助于实施此类管道。

\n\n

Python SDK 缺少其中一些实用程序,特别是计时器。因此,我们需要使用一种 hack,可以通过将消息插入到我们的some_other_topicPubSub 中来触发侧面输入的重新加载。

\n\n

这也意味着您必须手动执行 BigQuery 查找。您或许可以使用该类apache_beam.io.gcp.bigquery_tools.BigQueryWrapper直接在 BigQuery 中执行查找。

\n\n

以下是刷新一些货币换算数据的管道示例。我还没有测试过它,但我 90% 确信它只需进行少量调整即可工作。让我知道这是否有帮助。

\n\n
pipeline_options = PipelineOptions()\np = beam.Pipeline(options=pipeline_options)\n\ndef load_conversion_data(_):\n  # I will suppose that these are currency conversions. E.g. \n  # {\xe2\x80\x98EURUSD\xe2\x80\x99: 1.1, \xe2\x80\x98USDMXN\xe2\x80\x99 20, \xe2\x80\xa6}\n  return external_service.load_my_conversion_data()\n\ntable_pcv = beam.pvalue.AsSingleton((\n  p\n  | beam.io.gcp.ReadFromPubSub(topic=some_other_topic)\n  | WindowInto(window.GlobalWindow(),\n               trigger=trigger.Repeatedly(trigger.AfterCount(1),\n               accumulation_mode=trigger.AccumulationMode.DISCARDING)\n  | beam.Map(load_conversion_data)))\n\n\nclass ConvertTo(beam.DoFn):\n  def __init__(self, target_currency):\n    self.target_currenct = target_currency\n\n  def process(self, elm, rates):\n    if elm[\xe2\x80\x98currency\xe2\x80\x99] == self.target_currency:\n      yield elm\n    elif \xe2\x80\x98%s%s\xe2\x80\x99 % (elm[\xe2\x80\x98currency\xe2\x80\x99], self.target_currency) in rates:\n      rate = rates[\xe2\x80\x98%s%s\xe2\x80\x99 % (elm[\xe2\x80\x98currency\xe2\x80\x99], self.target_currency)]\n      result = {}.update(elm).update({\xe2\x80\x98currency\xe2\x80\x99: self.target_currency,\n                                      \xe2\x80\x98value\xe2\x80\x99: elm[\xe2\x80\x98value\xe2\x80\x99]*rate})\n      yield result\n    else:\n      return  # We drop that value\n\n\n_ = (p \n     | beam.io.gcp.ReadFromPubSub(topic=some_topic)\n     | beam.WindowInto(window.FixedWindows(1))\n     | beam.ParDo(ConvertTo(\xe2\x80\x98USD\xe2\x80\x99), rates=table_pcv))\n
Run Code Online (Sandbox Code Playgroud)\n