基于 PubSub 通知启动的数据流作业 - Python

Ash*_* KS 0 python google-cloud-platform google-cloud-pubsub google-cloud-dataflow apache-beam

我正在编写一个数据流作业,它从 BigQuery 读取数据并进行一些转换。

data = (
    pipeline
    | beam.io.ReadFromBigQuery(query='''
    SELECT * FROM `bigquery-public-data.chicago_crime.crime` LIMIT 100
    ''', use_standard_sql=True)
    | beam.Map(print)
)
Run Code Online (Sandbox Code Playgroud)

但我的要求是仅在收到来自 PubSub 主题的通知后才从 BigQuery 读取。仅当收到以下消息时,上述 DataFlow 作业才应开始从 BigQuery 读取数据。如果是不同的作业 ID 或不同的状态,则不应执行任何操作。

PubSub Message : {'job_id':101, 'status': 'Success'}
Run Code Online (Sandbox Code Playgroud)

这部分有什么帮助吗?

Cap*_*bla 5

这相当简单,代码如下所示

pubsub_msg = (
   pipeline
   | beam.io.gcp.pubsub.ReadFromPubSub(topic=my_topic, subscription=my_subscription)
)

bigquery_data = (
    pubsub_msg
    | beam.Filter(lambda msg: msg['job_id']==101)   # you might want to use a more sophisticated filter condition
    | beam.io.ReadFromBigQuery(query='''
    SELECT * FROM `bigquery-public-data.chicago_crime.crime` LIMIT 100
    ''', use_standard_sql=True)
)
bigquery_data | beam.Map(print)
Run Code Online (Sandbox Code Playgroud)

但是,如果您这样做,您将运行一个流式 DataFlow 作业(无限期地运行,或者直到您取消该作业),因为ReadFromPubSub在流式作业中会自动使用结果。因此,当消息到达 PubSub 时,这不会启动Dataflow 作业,而是一个作业已经在运行并侦听主题以执行某些操作。

如果您确实想触发 Dataflow 批处理作业,我建议您使用Dataflow 模板,并使用侦听您的 PubSub 主题的云函数启动此模板。过滤的逻辑将在此 CloudFunction 内(作为一个简单的if条件)。