IoT*_*ser 3 python google-cloud-dataflow apache-beam
现在的情况
该管道的目的是从发布/订阅中读取带有地理数据的有效负载,然后对这些数据进行转换和分析,最后返回条件是否为真或假
with beam.Pipeline(options=pipeline_options) as p:
raw_data = (p
| 'Read from PubSub' >> beam.io.ReadFromPubSub(
subscription='projects/XXX/subscriptions/YYY'))
geo_data = (raw_data
| 'Geo data transform' >> beam.Map(lambda s: GeoDataIngestion(s)))
def GeoDataIngestion(string_input):
<...>
return True or False
Run Code Online (Sandbox Code Playgroud)
理想情况1
如果 GeoDataIngestion 结果为 true,则 raw_data 将存储在大查询中
geo_data = (raw_data
| 'Geo data transform' >> beam.Map(lambda s: GeoDataIngestion(s))
| 'Evaluate condition' >> beam.Map(lambda s: Condition(s))
)
def Condition(condition):
if condition:
<...WriteToBigQuery...>
#The class I used before to store raw_data without depending on evaluate condition:
class WriteToBigQuery(beam.PTransform):
def expand(self, pcoll):
return (
pcoll
| 'Format' >> beam.ParDo(FormatBigQueryFn())
| 'Write to BigQuery' >> beam.io.WriteToBigQuery(
'XXX',
schema=TABLE_SCHEMA,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))
Run Code Online (Sandbox Code Playgroud)
理想情况2
与其将数据存储在 BigQuery 中,不如将数据发送到 pub/sub
def Condition(condition):
if condition:
<...SendToPubSub(Topic1)...>
else:
<...SendToPubSub(Topic2)...>
Run Code Online (Sandbox Code Playgroud)
在这里,问题是根据条件结果设置主题,因为我无法在管道中传递主题之类的参数
| beam.io.WriteStringsToPubSub(TOPIC)
Run Code Online (Sandbox Code Playgroud)
既不在函数/类中
问题
我怎样才能做到这一点?
如果评估条件的结果为真,我应该如何/在哪里调用 WriteToBigQuery 来存储 PCollection raw_data?
Tan*_*din 10
我认为基于评估条件结果的分支集合可能对您的场景有所帮助。请参阅此处的文档。
为了说明分支,假设我下面有一个集合,您希望根据字符串的内容在其中执行不同的操作。
'this line is for BigQuery',
'this line for pubsub topic1',
'this line for pubsub topic2'
Run Code Online (Sandbox Code Playgroud)
下面的代码将为集合创建标签,您可以根据标签获得三个不同的 PCollection。然后您可以决定要对各个集合执行哪些进一步操作。
import apache_beam as beam
from apache_beam import pvalue
import sys
class Split(beam.DoFn):
# These tags will be used to tag the outputs of this DoFn.
OUTPUT_TAG_BQ = 'BigQuery'
OUTPUT_TAG_PS1 = 'pubsub topic1'
OUTPUT_TAG_PS2 = 'pubsub topic2'
def process(self, element):
"""
tags the input as it processes the orignal PCollection
"""
print element
if "BigQuery" in element:
yield pvalue.TaggedOutput(self.OUTPUT_TAG_BQ, element)
print 'found bq'
elif "pubsub topic1" in element:
yield pvalue.TaggedOutput(self.OUTPUT_TAG_PS1, element)
elif "pubsub topic2" in element:
yield pvalue.TaggedOutput(self.OUTPUT_TAG_PS2, element)
if __name__ == '__main__':
output_prefix = 'C:\\pythonVirtual\\Mycodes\\output'
p = beam.Pipeline(argv=sys.argv)
lines = (p
| beam.Create([
'this line is for BigQuery',
'this line for pubsub topic1',
'this line for pubsub topic2']))
# with_outputs allows accessing the explicitly tagged outputs of a DoFn.
tagged_lines_result = (lines
| beam.ParDo(Split()).with_outputs(
Split.OUTPUT_TAG_BQ,
Split.OUTPUT_TAG_PS1,
Split.OUTPUT_TAG_PS2))
# tagged_lines_result is an object of type DoOutputsTuple. It supports
# accessing result in alternative ways.
bq_records = tagged_lines_result[Split.OUTPUT_TAG_BQ]| "write BQ" >> beam.io.WriteToText(output_prefix + 'bq')
ps1_records = tagged_lines_result[Split.OUTPUT_TAG_PS1] | "write PS1" >> beam.io.WriteToText(output_prefix + 'ps1')
ps2_records = tagged_lines_result[Split.OUTPUT_TAG_PS2] | "write PS2" >> beam.io.WriteToText(output_prefix + 'ps2')
p.run().wait_until_finish()
Run Code Online (Sandbox Code Playgroud)
如果有帮助,请告诉我。
归档时间: |
|
查看次数: |
4380 次 |
最近记录: |