pwa*_*ach 6 python google-cloud-dataflow apache-beam
使用流式插入和 Python SDK 2.23 写入 BigQuery 时,我遇到了意外的性能问题。
如果没有写入步骤,管道将在一个工作线程上运行,CPU 约为 20-30%。添加 BigQuery 步骤后,管道可扩展到 6 个工作线程,所有工作线程的 CPU 利用率为 70-90%。
我对 Dataflow 和 Beam 还很陌生,可能这种行为是正常的,或者我做错了什么,但在我看来,使用 6 台机器每秒向 BigQuery 写入 250 行有点繁重。我想知道如何才能达到每秒 100K 行的插入配额。
我的管道如下所示:
p
| "Read from PubSub" >> beam.io.ReadFromPubSub(subscription=options.pubsub_subscription) # ~40/s
| "Split messages" >> beam.FlatMap(split_messages) # ~ 400/s
| "Prepare message for BigQuery" >> beam.Map(prepare_row)
| "Filter known message types" >> beam.Filter(filter_message_types) # ~ 250/s
| "Write to BigQuery" >> beam.io.WriteToBigQuery(
table=options.table_spec_position,
schema=table_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
additional_bq_parameters=additional_bq_parameters,
)
Run Code Online (Sandbox Code Playgroud)
尽管我在不使用流引擎的情况下经历了类似的行为,但管道使用这些选项运行。
--enable_streaming_engine \
--autoscaling_algorithm=THROUGHPUT_BASED \
--max_num_workers=15 \
--machine_type=n1-standard-2 \
--disk_size_gb=30 \
Run Code Online (Sandbox Code Playgroud)
我的问题是这种行为是否正常,或者我可以采取什么措施来减少该管道所需的工人数量。谢谢!
更新:这是带有墙时间的数据流图最后一步的图像。(作业运行 1 小时后拍摄)。之前的所有其他步骤的墙时间都非常短,只有几秒钟。
经过一番调试后,我发现有一些无效消息无法写入 BigQuery(并且没有记录错误)。因此,对于遇到类似问题的任何人:
insert_retry_strategy将of更改beam.io.WriteToBigQuery为RETRY_NEVER并打印出死信 pCollection 后,我修复了格式错误的消息,并且性能得到了提高。我猜想由于 的默认策略,一些无效消息被卡住了RETRY_ALWAYS。