BigQueryIO - 无法使用DynamicDestination和CREATE_IF_NEEDED进行无界PCollection和FILE_LOADS

ben*_*ben 5 google-bigquery google-cloud-platform google-cloud-dataflow apache-beam

我的工作流程:KAFKA - >数据流 - > BigQuery

鉴于在我的情况下低延迟并不重要,我使用FILE_LOADS来降低成本.我正在使用带有DynamicDestination的BigQueryIO.Write(每小时一个新表,当前小时作为后缀).

这个BigQueryIO.Write配置如下:

.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withMethod(Method.FILE_LOADS)
.withTriggeringFrequency(triggeringFrequency)
.withNumFileShards(100)
Run Code Online (Sandbox Code Playgroud)

第一个表已成功创建并写入.但是,从未创建以下表格,我得到以下表格:

(99e5cd8c66414e7a): java.lang.RuntimeException: Failed to create load job with id prefix 5047f71312a94bf3a42ee5d67feede75_5295fbf25e1a7534f85e25dcaa9f4986_00001_00023, reached max retries: 3, last failed load job: {
  "configuration" : {
    "load" : {
      "createDisposition" : "CREATE_NEVER",
      "destinationTable" : {
        "datasetId" : "dev_mydataset",
        "projectId" : "myproject-id",
        "tableId" : "mytable_20180302_16"
      },
Run Code Online (Sandbox Code Playgroud)

对于第一个表中的createDisposition会使用的CREATE_IF_NEEDED符合规定,但随后此参数不考虑和CREATE_NEVER默认使用.

我还在JIRA上创建了这个问题.

dse*_*sto 1

根据Apache Beam 的 BigQueryIO 的文档,该方法要求在使用CREATE_IF_NEEDED时使用前提条件BigQueryIO.Write.CreateDisposition提供表模式。.withSchema()

正如数据流文档中所述:

请注意,如果您将 CREATE_IF_NEEDED 指定为 CreateDisposition 并且不提供 TableSchema,则如果目标表不存在,转换可能会在运行时失败并出现 java.lang.IllegalArgumentException

文档指出的错误与您收到的错误不同(您收到java.lang.RuntimeException),但根据BigQueryIO.Write()您共享的配置,您没有指定任何表模式,因此,如果表丢失,作业很容易失败。

因此,作为解决问题的第一个措施,您应该创建与要TableSchema()加载到 BQ 的数据相匹配的表架构,然后.withSchema(schema)相应地使用前提条件:

List<TableFieldSchema> fields = new ArrayList<>();
// Add fields like:
fields.add(new TableFieldSchema().setName("<FIELD_NAME>").setType("<FIELD_TYPE>"));
TableSchema schema = new TableSchema().setFields(fields);

// BigQueryIO.Write configuration plus:
    .withSchema(schema)
Run Code Online (Sandbox Code Playgroud)