Ash*_* KS 0 python google-cloud-platform google-cloud-pubsub google-cloud-dataflow apache-beam
我正在编写一个数据流作业,它从 BigQuery 读取数据并进行一些转换。
data = (
pipeline
| beam.io.ReadFromBigQuery(query='''
SELECT * FROM `bigquery-public-data.chicago_crime.crime` LIMIT 100
''', use_standard_sql=True)
| beam.Map(print)
)
Run Code Online (Sandbox Code Playgroud)
但我的要求是仅在收到来自 PubSub 主题的通知后才从 BigQuery 读取。仅当收到以下消息时,上述 DataFlow 作业才应开始从 BigQuery 读取数据。如果是不同的作业 ID 或不同的状态,则不应执行任何操作。
PubSub Message : {'job_id':101, 'status': 'Success'}
Run Code Online (Sandbox Code Playgroud)
这部分有什么帮助吗?
这相当简单,代码如下所示
pubsub_msg = (
pipeline
| beam.io.gcp.pubsub.ReadFromPubSub(topic=my_topic, subscription=my_subscription)
)
bigquery_data = (
pubsub_msg
| beam.Filter(lambda msg: msg['job_id']==101) # you might want to use a more sophisticated filter condition
| beam.io.ReadFromBigQuery(query='''
SELECT * FROM `bigquery-public-data.chicago_crime.crime` LIMIT 100
''', use_standard_sql=True)
)
bigquery_data | beam.Map(print)
Run Code Online (Sandbox Code Playgroud)
但是,如果您这样做,您将运行一个流式 DataFlow 作业(无限期地运行,或者直到您取消该作业),因为ReadFromPubSub
在流式作业中会自动使用结果。因此,当消息到达 PubSub 时,这不会启动Dataflow 作业,而是一个作业已经在运行并侦听主题以执行某些操作。
如果您确实想触发 Dataflow 批处理作业,我建议您使用Dataflow 模板,并使用侦听您的 PubSub 主题的云函数启动此模板。过滤的逻辑将在此 CloudFunction 内(作为一个简单的if
条件)。
归档时间: |
|
查看次数: |
349 次 |
最近记录: |