条件语句 Python Apache Beam 管道

IoT*_*ser 3 python google-cloud-dataflow apache-beam

现在的情况

该管道的目的是从发布/订阅中读取带有地理数据的有效负载,然后对这些数据进行转换和分析,最后返回条件是否为真或假

 with beam.Pipeline(options=pipeline_options) as p:
        raw_data = (p
                    | 'Read from PubSub' >> beam.io.ReadFromPubSub(
                    subscription='projects/XXX/subscriptions/YYY'))

        geo_data = (raw_data
                    | 'Geo data transform' >> beam.Map(lambda s: GeoDataIngestion(s)))
                    
                    

def GeoDataIngestion(string_input):
    <...>
    return True or False
Run Code Online (Sandbox Code Playgroud)

理想情况1

如果 GeoDataIngestion 结果为 true,则 raw_data 将存储在大查询中

geo_data = (raw_data
                | 'Geo data transform' >> beam.Map(lambda s: GeoDataIngestion(s))
                | 'Evaluate condition' >> beam.Map(lambda s: Condition(s))
                )

def Condition(condition):
    if condition:
        <...WriteToBigQuery...>


#The class I used before to store raw_data without depending on evaluate condition:

class WriteToBigQuery(beam.PTransform):
    def expand(self, pcoll):
        return (
                pcoll
                | 'Format' >> beam.ParDo(FormatBigQueryFn())
                | 'Write to BigQuery' >> beam.io.WriteToBigQuery(
            'XXX',
            schema=TABLE_SCHEMA,
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))
Run Code Online (Sandbox Code Playgroud)

理想情况2

与其将数据存储在 BigQuery 中,不如将数据发送到 pub/sub

def Condition(condition):
    if condition:
        <...SendToPubSub(Topic1)...>
    else:
        <...SendToPubSub(Topic2)...>
Run Code Online (Sandbox Code Playgroud)

在这里,问题是根据条件结果设置主题,因为我无法在管道中传递主题之类的参数

 | beam.io.WriteStringsToPubSub(TOPIC)
Run Code Online (Sandbox Code Playgroud)

既不在函数/类中

问题

我怎样才能做到这一点?

如果评估条件的结果为真,我应该如何/在哪里调用 WriteToBigQuery 来存储 PCollection raw_data?

Tan*_*din 10

我认为基于评估条件结果的分支集合可能对您的场景有所帮助。请参阅此处的文档。

为了说明分支,假设我下面有一个集合,您希望根据字符串的内容在其中执行不同的操作。

'this line is for BigQuery',
'this line for pubsub topic1',
'this line for pubsub topic2'
Run Code Online (Sandbox Code Playgroud)

下面的代码将为集合创建标签,您可以根据标签获得三个不同的 PCollection。然后您可以决定要对各个集合执行哪些进一步操作。

import apache_beam as beam
from apache_beam import pvalue
import sys

class Split(beam.DoFn):

    # These tags will be used to tag the outputs of this DoFn.
    OUTPUT_TAG_BQ = 'BigQuery'
    OUTPUT_TAG_PS1 = 'pubsub topic1'
    OUTPUT_TAG_PS2 = 'pubsub topic2'

    def process(self, element):
        """
        tags the input as it processes the orignal PCollection
        """
        print element
        if "BigQuery" in element:
            yield pvalue.TaggedOutput(self.OUTPUT_TAG_BQ, element)
            print 'found bq'
        elif "pubsub topic1" in element:
            yield pvalue.TaggedOutput(self.OUTPUT_TAG_PS1, element)
        elif "pubsub topic2" in element:
            yield pvalue.TaggedOutput(self.OUTPUT_TAG_PS2, element)


if __name__ == '__main__':
    output_prefix = 'C:\\pythonVirtual\\Mycodes\\output'
    p = beam.Pipeline(argv=sys.argv)
    lines = (p
            | beam.Create([
               'this line is for BigQuery',
               'this line for pubsub topic1',
               'this line for pubsub topic2']))

    # with_outputs allows accessing the explicitly tagged outputs of a DoFn.
    tagged_lines_result = (lines
                          | beam.ParDo(Split()).with_outputs(
                              Split.OUTPUT_TAG_BQ,
                              Split.OUTPUT_TAG_PS1,
                              Split.OUTPUT_TAG_PS2))

    # tagged_lines_result is an object of type DoOutputsTuple. It supports
    # accessing result in alternative ways.
    bq_records = tagged_lines_result[Split.OUTPUT_TAG_BQ]| "write BQ" >> beam.io.WriteToText(output_prefix + 'bq')
    ps1_records = tagged_lines_result[Split.OUTPUT_TAG_PS1] | "write PS1" >> beam.io.WriteToText(output_prefix + 'ps1')
    ps2_records = tagged_lines_result[Split.OUTPUT_TAG_PS2] | "write PS2" >> beam.io.WriteToText(output_prefix + 'ps2')

    p.run().wait_until_finish()
Run Code Online (Sandbox Code Playgroud)

如果有帮助,请告诉我。