如何使用 Python 处理 Dataflow 管道中的 BigQuery 插入错误?

JPM*_*PMC 3 python google-bigquery google-cloud-pubsub google-cloud-dataflow apache-beam

我正在尝试使用 Dataflow 创建一个流式管道,该管道从 PubSub 主题读取消息以最终将它们写入 BigQuery 表。我不想使用任何 Dataflow 模板。

目前,我只想在从 Google VM 实例执行的 Python3 脚本中创建一个管道,以对从 Pubsub 到达的每条消息(解析它包含的记录并添加一个新字段)执行加载和转换过程,以结束将结果写入 BigQuery 表。

简化,我的代码是:

#!/usr/bin/env python
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1, 
import apache_beam as beam
import apache_beam.io.gcp.bigquery
import logging
import argparse
import sys
import json
from datetime import datetime, timedelta

def load_pubsub(message):
    try:
        data = json.loads(message)
        records = data["messages"]
        return records
    except:
        raise ImportError("Something went wrong reading data from the Pub/Sub topic")

class ParseTransformPubSub(beam.DoFn):
    def __init__(self):
        self.water_mark = (datetime.now() + timedelta(hours = 1)).strftime("%Y-%m-%d %H:%M:%S.%f")
    def process(self, records):
        for record in records:
            record["E"] = self.water_mark 
            yield record

def main():
    table_schema = apache_beam.io.gcp.bigquery.parse_table_schema_from_json(open("TableSchema.json"))
    parser = argparse.ArgumentParser()
    parser.add_argument('--input_topic')
    parser.add_argument('--output_table')
    known_args, pipeline_args = parser.parse_known_args(sys.argv)
    with beam.Pipeline(argv = pipeline_args) as p:
        pipe = ( p | 'ReadDataFromPubSub' >> beam.io.ReadStringsFromPubSub(known_args.input_topic)
                   | 'LoadJSON' >> beam.Map(load_pubsub)
                   | 'ParseTransform' >> beam.ParDo(ParseTransformPubSub())
                   | 'WriteToAvailabilityTable' >> beam.io.WriteToBigQuery(
                                      table = known_args.output_table,
                                      schema = table_schema,
                                      create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                                      write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND)
                )   
        result = p.run()
        result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Run Code Online (Sandbox Code Playgroud)

(例如)在 PubSub 主题中发布的消息使用如下:

'{"messages":[{"A":"Alpha", "B":"V1", "C":3, "D":12},{"A":"Alpha", "B":"V1", "C":5, "D":14},{"A":"Alpha", "B":"V1", "C":3, "D":22}]}'
Run Code Online (Sandbox Code Playgroud)

如果在记录中添加了字段“E”,那么记录的结构(Python 中的字典)和字段的数据类型就是 BigQuery 表所期望的。

问题是一个我想处理如下:

  1. 如果某些消息带有意外的结构,我想将管道分叉并将它们写入另一个 BigQuery 表中。

  2. If some messages come with an unexpected data type of a field, then in the last level of the pipeline when they should be written in the table an error will occur. I want to manage this type of error by diverting the record to a third table.

I read the documentation found on the following pages but I found nothing: https://cloud.google.com/dataflow/docs/guides/troubleshooting-your-pipeline https://cloud.google.com/dataflow/docs/guides/common-errors

By the way, if I choose the option to configure the pipeline through the template that reads from a PubSubSubscription and writes into BigQuery I get the following schema which turns out to be the same one I'm looking for:

Template: Cloud Pub/Sub Subscription to BigQuery

gui*_*ere 6

您无法捕捉到 BigQuery 的接收器中发生的错误。您写入 bigquery 的消息必须是好的。

最好的模式是执行一个检查你的消息结构和字段类型的转换。如果出现错误,您可以创建一个错误流并将此问题流写入文件(例如,或在没有架构的表中,您以纯文本形式写入消息)

  • [Beam 编程指南提供了您可以做什么的完整示例](https://beam.apache.org/documentation/programming-guide/#additional-outputs)。在 DoFn 函数(如“ProcessWords”)中,执行您想要确保流程正确的检查。对于发现的所有错误,执行此“yield pvalue.TaggedOutput('error_value', element)”。通过应用 ParDo,您将在输出中获得 2 个 PCollection:正确流和错误流。然后在每个 PCollection 上应用您想要的接收器。 (4认同)