eeb*_*ruu 5 python google-bigquery google-cloud-platform google-cloud-dataflow apache-beam
我正在使用 apache beam pipeline,我想使用 python 批量插入到 bigquery。我的数据来自无限制的 Pub/Sub。根据我的研究结果,带有触发器的 GlobalWindows 应该可以解决我的问题。我尝试使用窗口化管道,但它仍然进行流式插入。我的管道代码如下:
p2 = (p | 'Read ' >> beam.io.ReadFromPubSub(subscription=subscription_path,
with_attributes=True,
timestamp_attribute=None,id_label=None)
| 'Windowing' >> beam.WindowInto(window.GlobalWindows(),
trigger=Repeatedly(
AfterAny(
AfterCount(100),
AfterProcessingTime(1 * 60))),
accumulation_mode=AccumulationMode.DISCARDING)
| 'Process ' >> beam.Map(getAttributes))
p3 = (p2 | 'Filter ' >> beam.Filter(lambda msg: (("xx" in msg) and (msg["xx"].lower() == "true")))
| 'Delete ' >> beam.Map(deleteAttribute)
| 'Write ' >> writeTable(bq_table_test, bq_batch_size))
def writeTable(table_name):
return beam.io.WriteToBigQuery(
table=table_name,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
batch_size=100)
Run Code Online (Sandbox Code Playgroud)
我正在从计费报告中检查插入是批量还是流式插入。当 Streming 插入使用量增加时,我了解到批量插入没有发生。是否还有其他功能可以检查插入是流式插入还是批量插入?还有如何批量插入到 bigquery ?
根据文档,您无法指定插入类型,它会根据您的输入自动检测PCollection:
Beam SDK for Python 目前不支持指定插入方法。
BigQueryIO 支持两种将数据插入 BigQuery 的方法:加载作业和流式插入。每种插入方法都提供不同的成本、配额和数据一致性权衡。有关这些权衡的更多信息,请参阅加载作业和流式插入的 BigQuery 文档。
BigQueryIO 根据输入 PCollection 选择默认插入方法。
当您将 BigQueryIO 写入转换应用于有界 PCollection 时,BigQueryIO 会使用加载作业。
当您将 BigQueryIO 写入转换应用于无界 PCollection 时,BigQueryIO 使用流式插入。
在您的情况下,您正在从无限源(Pubsub)读取,因此在这种情况下它始终是流式写入。加窗不会改变数据的性质。
我能想到的一种解决方法是拆分管道,例如,流式管道将写入某个存储(GCS)上的文件集合,然后另一个管道将读取并上传这些文件(文件是有界的)。
| 归档时间: |
|
| 查看次数: |
3643 次 |
| 最近记录: |