小编JPM*_*PMC的帖子

如何使用 Python 处理 Dataflow 管道中的 BigQuery 插入错误?

我正在尝试使用 Dataflow 创建一个流式管道,该管道从 PubSub 主题读取消息以最终将它们写入 BigQuery 表。我不想使用任何 Dataflow 模板。

目前,我只想在从 Google VM 实例执行的 Python3 脚本中创建一个管道,以对从 Pubsub 到达的每条消息(解析它包含的记录并添加一个新字段)执行加载和转换过程,以结束将结果写入 BigQuery 表。

简化,我的代码是:

#!/usr/bin/env python
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1, 
import apache_beam as beam
import apache_beam.io.gcp.bigquery
import logging
import argparse
import sys
import json
from datetime import datetime, timedelta

def load_pubsub(message):
    try:
        data = json.loads(message)
        records = data["messages"]
        return records
    except:
        raise ImportError("Something went wrong reading data from the Pub/Sub topic")

class ParseTransformPubSub(beam.DoFn):
    def __init__(self):
        self.water_mark = (datetime.now() + timedelta(hours …
Run Code Online (Sandbox Code Playgroud)

python google-bigquery google-cloud-pubsub google-cloud-dataflow apache-beam

3
推荐指数
1
解决办法
1433
查看次数