小编eeb*_*ruu的帖子

使用 Dataflow 批量插入 Bigquery

我正在使用 apache beam pipeline,我想使用 python 批量插入到 bigquery。我的数据来自无限制的 Pub/Sub。根据我的研究结果,带有触发器的 GlobalWindows 应该可以解决我的问题。我尝试使用窗口化管道,但它仍然进行流式插入。我的管道代码如下:

p2 = (p | 'Read ' >> beam.io.ReadFromPubSub(subscription=subscription_path,
    with_attributes=True,
    timestamp_attribute=None,id_label=None)
       | 'Windowing' >>  beam.WindowInto(window.GlobalWindows(),
           trigger=Repeatedly(
                   AfterAny(
                AfterCount(100),
           AfterProcessingTime(1 * 60))), 
        accumulation_mode=AccumulationMode.DISCARDING)
      | 'Process ' >> beam.Map(getAttributes))
p3 = (p2 | 'Filter ' >> beam.Filter(lambda msg: (("xx" in msg) and (msg["xx"].lower() == "true")))
         | 'Delete ' >> beam.Map(deleteAttribute)
         | 'Write '  >> writeTable(bq_table_test, bq_batch_size))

def writeTable(table_name):
return beam.io.WriteToBigQuery(
    table=table_name,
    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
    write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
    batch_size=100)
Run Code Online (Sandbox Code Playgroud)

我正在从计费报告中检查插入是批量还是流式插入。当 Streming 插入使用量增加时,我了解到批量插入没有发生。是否还有其他功能可以检查插入是流式插入还是批量插入?还有如何批量插入到 bigquery ?

python google-bigquery google-cloud-platform google-cloud-dataflow apache-beam

5
推荐指数
1
解决办法
3643
查看次数

当目标表在Bigquery上具有覆盖首选项时,所有列都有NULLABLE问题

我正在运行查询以更改列数据类型并选择目标表查询表本身.我选择写优先"覆盖表".表所有列都是REQUIRED,表不为空.但运行查询后,所有列模式都会更改为NULLABLE.我的演员查询如下:

SELECT CAST(id AS STRING) as id, column1, column2 FROM dataset.mytable;

它总是这样,或者我犯了错误?

google-bigquery

0
推荐指数
1
解决办法
89
查看次数