Apache Beam 云数据流流卡住侧输入

fah*_*dul 8 python google-cloud-dataflow apache-beam

我目前正在 GCP Dataflow 中构建 PoC Apache Beam 管道。在本例中,我想使用来自 PubSub 的主输入和来自 BigQuery 的侧输入创建流式传输管道,并将处理后的数据存储回 BigQuery。

侧管线代码

side_pipeline = (
    p
    | "periodic" >> PeriodicImpulse(fire_interval=3600, apply_windowing=True)
    | "map to read request" >>
        beam.Map(lambda x:beam.io.gcp.bigquery.ReadFromBigQueryRequest(table=side_table))
    | beam.io.ReadAllFromBigQuery()
)
Run Code Online (Sandbox Code Playgroud)

侧面输入代码功能

def enrich_payload(payload, equipments):
    id = payload["id"]
    for equipment in equipments:
        if id == equipment["id"]:
            payload["type"] = equipment["type"]
            payload["brand"] = equipment["brand"]
            payload["year"] = equipment["year"]

            break

    return payload
Run Code Online (Sandbox Code Playgroud)

主管道代码

main_pipeline = (
    p
    | "read" >> beam.io.ReadFromPubSub(topic="projects/my-project/topics/topiq")
    | "bytes to dict" >> beam.Map(lambda x: json.loads(x.decode("utf-8")))
    | "transform" >> beam.Map(transform_function)
    | "timestamping" >> beam.Map(lambda src: window.TimestampedValue(
        src,
        dt.datetime.fromisoformat(src["timestamp"]).timestamp()
    ))
    | "windowing" >> beam.WindowInto(window.FixedWindows(30))
)

final_pipeline = (
    main_pipeline
    | "enrich data" >> beam.Map(enrich_payload, equipments=beam.pvalue.AsIter(side_pipeline))
    | "store" >> beam.io.WriteToBigQuery(bq_table)
)

result = p.run()
result.wait_until_finish()
Run Code Online (Sandbox Code Playgroud)

将其部署到 Dataflow 后,一切看起来都很好,没有错误。但后来我注意到该enrich data步骤有两个节点而不是一个。

数据流图

此外,正如您所看到的,侧面输入卡住了,Elements Added输入集合中有 21 个计数,输出集合中 有 21 个-值。Elements Added丰富数据卡住

您可以在此处找到完整的管道代码

我已经遵循这些文档中的所有说明:

但还是发现了这个错误。请帮我。谢谢!

Iñi*_*igo 6

这里有一个工作示例:

mytopic = ""
sql = "SELECT station_id, CURRENT_TIMESTAMP() timestamp FROM `bigquery-public-data.austin_bikeshare.bikeshare_stations` LIMIT 10"

def to_bqrequest(e, sql):
    from apache_beam.io import ReadFromBigQueryRequest
    yield ReadFromBigQueryRequest(query=sql)
     

def merge(e, side):
    for i in side:
        yield f"Main {e.decode('utf-8')} Side {i}"

pubsub = p | "Read PubSub topic" >> ReadFromPubSub(topic=mytopic)

side_pcol = (p | PeriodicImpulse(fire_interval=300, apply_windowing=False)
               | "ApplyGlobalWindow" >> WindowInto(window.GlobalWindows(),
                                           trigger=trigger.Repeatedly(trigger.AfterProcessingTime(5)),
                                           accumulation_mode=trigger.AccumulationMode.DISCARDING)
               | "To BQ Request" >> ParDo(to_bqrequest, sql=sql)
               | ReadAllFromBigQuery()
            )

final = (pubsub | "Merge" >> ParDo(merge, side=beam.pvalue.AsList(side_pcol))
                | Map(logging.info)
        )                    
    
p.run()
Run Code Online (Sandbox Code Playgroud)

请注意,这使用了 a GlobalWindow(以便两个输入具有相同的窗口)。我使用了处理时间触发器,以便该窗格包含多行。5是任意选择的,使用1也可以。

请注意,侧输入和主输入之间的数据匹配是不确定的,您可能会看到旧触发窗格中的值波动。

理论上,使用FixedWindows应该可以解决这个问题,但我无法让它FixedWindows工作。

  • 很高兴看到它有效!哦,我遗憾的是,我在度假时看到了这个问题,我正在等待回来开始编码:D 不需要创建新的赏金,我很高兴这对你有用 (3认同)