从Pub/Sub流式传输到BigQuery

Mar*_*nzo 5 python google-bigquery google-cloud-platform google-cloud-pubsub google-cloud-dataflow

我正在尝试使用python数据流将谷歌PubSub中的一些数据流式传输到BigQuery.出于测试目的,我通过设置将以下代码https://github.com/GoogleCloudPlatform/DataflowSDK-examples/blob/master/python/dataflow_examples/cookbook/bigquery_schema.py改编为流式传输管道

options.view_as(StandardOptions).streaming = True
Run Code Online (Sandbox Code Playgroud)

然后我将record_ids管道更改为从Pub/Sub读取

# ADDED THIS
lines = p | 'Read PubSub' >> beam.io.ReadStringsFromPubSub(INPUT_TOPIC) | beam.WindowInto(window.FixedWindows(15))
# CHANGED THIS # record_ids = p | 'CreateIDs' >> beam.Create(['1', '2', '3', '4', '5'])
record_ids = lines | 'Split' >> (beam.FlatMap(split_fn).with_output_types(unicode))
records = record_ids | 'CreateRecords' >> beam.Map(create_random_record)
records | 'Write' >> beam.io.Write(
    beam.io.BigQuerySink(
        OUTPUT,
        schema=table_schema,
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
        write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
Run Code Online (Sandbox Code Playgroud)

注意:我已被谷歌列入白名单以运行代码(在alpha中)

现在,当我尝试它时,我有一个错误

工作流程失败.原因:(f215df7c8fcdbb00):未知的流式接收器:bigquery

你可以在这里找到完整的代码:https://github.com/marcorigodanzo/gcp_streaming_test/blob/master/my_bigquery_schema.py

我认为这与现在流式传输的流水线有关,有谁能告诉我如何在流式传输管道中进行bigQuery写入?

jkf*_*kff 2

Beam Python 不支持从流管道写入 BigQuery。现在,您需要使用 Beam Java - 您可以分别使用PubsubIO.readStrings()BigQueryIO.writeTableRows()