我正在尝试使用 Dataflow 创建一个流式管道,该管道从 PubSub 主题读取消息以最终将它们写入 BigQuery 表。我不想使用任何 Dataflow 模板。
目前,我只想在从 Google VM 实例执行的 Python3 脚本中创建一个管道,以对从 Pubsub 到达的每条消息(解析它包含的记录并添加一个新字段)执行加载和转换过程,以结束将结果写入 BigQuery 表。
简化,我的代码是:
#!/usr/bin/env python
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1,
import apache_beam as beam
import apache_beam.io.gcp.bigquery
import logging
import argparse
import sys
import json
from datetime import datetime, timedelta
def load_pubsub(message):
try:
data = json.loads(message)
records = data["messages"]
return records
except:
raise ImportError("Something went wrong reading data from the Pub/Sub topic")
class ParseTransformPubSub(beam.DoFn):
def __init__(self):
self.water_mark = (datetime.now() + timedelta(hours …Run Code Online (Sandbox Code Playgroud) python google-bigquery google-cloud-pubsub google-cloud-dataflow apache-beam