小编ben*_*ben的帖子

BigQueryIO - 无法使用DynamicDestination和CREATE_IF_NEEDED进行无界PCollection和FILE_LOADS

我的工作流程:KAFKA - >数据流 - > BigQuery

鉴于在我的情况下低延迟并不重要,我使用FILE_LOADS来降低成本.我正在使用带有DynamicDestination的BigQueryIO.Write(每小时一个新表,当前小时作为后缀).

这个BigQueryIO.Write配置如下:

.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withMethod(Method.FILE_LOADS)
.withTriggeringFrequency(triggeringFrequency)
.withNumFileShards(100)
Run Code Online (Sandbox Code Playgroud)

第一个表已成功创建并写入.但是,从未创建以下表格,我得到以下表格:

(99e5cd8c66414e7a): java.lang.RuntimeException: Failed to create load job with id prefix 5047f71312a94bf3a42ee5d67feede75_5295fbf25e1a7534f85e25dcaa9f4986_00001_00023, reached max retries: 3, last failed load job: {
  "configuration" : {
    "load" : {
      "createDisposition" : "CREATE_NEVER",
      "destinationTable" : {
        "datasetId" : "dev_mydataset",
        "projectId" : "myproject-id",
        "tableId" : "mytable_20180302_16"
      },
Run Code Online (Sandbox Code Playgroud)

对于第一个表中的createDisposition会使用的CREATE_IF_NEEDED符合规定,但随后此参数不考虑和CREATE_NEVER默认使用.

我还在JIRA上创建了这个问题.

google-bigquery google-cloud-platform google-cloud-dataflow apache-beam

5
推荐指数
1
解决办法
368
查看次数

BigQueryIO - 使用流和FILE_LOADS写性能

我的管道:Kafka - > Dataflow streaming(Beam v2.3) - > BigQuery

鉴于在我的情况下低延迟并不重要,我使用FILE_LOADS来降低成本,如下所示:

BigQueryIO.writeTableRows()
  .withJsonSchema(schema)
  .withWriteDisposition(WriteDisposition.WRITE_APPEND)
  .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
  .withMethod(Method.FILE_LOADS)
  .withTriggeringFrequency(triggeringFrequency)      
  .withCustomGcsTempLocation(gcsTempLocation)
  .withNumFileShards(numFileShards) 
  .withoutValidation()
  .to(new SerializableFunction[ValueInSingleWindow[TableRow], TableDestination]() {
    def apply(element: ValueInSingleWindow[TableRow]): TableDestination = {
      ...
    }
  }
Run Code Online (Sandbox Code Playgroud)

这个数据流步骤在管道中引入了一个总是更大的延迟,因此它无法跟上Kafka吞吐量(小于50k事件/秒),即使有40 n1-standard-s4名工作人员.如下面的屏幕截图所示,此步骤的系统滞后非常大(接近管道正常运行时间),而Kafka系统滞后只有几秒钟.

BigQueryIO.Write引入了系统延迟

如果我理解正确,Dataflow会将元素写入gcsTempLocation中的numFileShards,并且每个triggeringFrequency都会启动一个加载作业,将它们插入到BigQuery中.例如,如果我选择5分钟的触发bq ls -a -j频率,我可以看到(有)所有负载作业需要不到1分钟才能完成.但仍然是这一步骤引入了越来越多的延迟,导致Kafka消耗越来越少的元素(由于bcackpressure).增加/减少numFileShardstriggeringFrequency并不能解决问题.

我没有手动指定任何窗口,我只是默认窗口.文件不会在gcsTempLocation中累积.

知道这里出了什么问题吗?

google-bigquery google-cloud-platform google-cloud-dataflow apache-beam

5
推荐指数
1
解决办法
895
查看次数