BigQueryIO.Write 序列化 withJsonSchema

Joe*_*e M 5 java google-cloud-dataflow apache-beam

我的梁管道中有一个 BigQueryIO.Write 阶段,它是通过调用构建的.withJsonSchema(String)

inputStream.apply(
    "save-to-bigquery",
    BigQueryIO.<Event>write()
        .withJsonSchema(jsonSchema)
        .to((ValueInSingleWindow<Event> input) ->
                new TableDestination(
                        "table_name$" + PARTITION_SELECTOR.print(input.getValue().getMetadata().getTimestamp()),
                        null)
        )
        .withFormatFunction((ConsumerApiRequest event) ->
                new TableRow()
                        .set("id", event.getMetadata().getUuid())
                        .set("insertId", event.getMetadata().getUuid())
                        .set("account_id", event.getAccountId())
                        ...
                        .set("timestamp", ISODateTimeFormat.dateHourMinuteSecondMillis()
                                .print(event.getMetadata().getTimestamp())))
        .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
    );
Run Code Online (Sandbox Code Playgroud)

我正在通过DataflowRunner这个阶段运行它,并且在执行这个阶段时我收到以下错误:

java.lang.IllegalArgumentException: 
        com.google.api.client.json.JsonParser.parseValue(JsonParser.java:889)
        com.google.api.client.json.JsonParser.parse(JsonParser.java:382)
        com.google.api.client.json.JsonParser.parse(JsonParser.java:336)
        com.google.api.client.json.JsonParser.parse(JsonParser.java:312)
        com.google.api.client.json.JsonFactory.fromString(JsonFactory.java:187)
        org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.fromJsonString(BigQueryHelpers.java:156)
        org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers$ConstantSchemaDestinations.getSchema(DynamicDestinationsHelpers.java:163)
        org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers$ConstantSchemaDestinations.getSchema(DynamicDestinationsHelpers.java:150)
        org.apache.beam.sdk.io.gcp.bigquery.CreateTables$1.processElement(CreateTables.java:103)
Caused by: java.lang.IllegalArgumentException: expected collection or array type but got class com.google.api.services.bigquery.model.TableSchema
        com.google.api.client.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:148)
        com.google.api.client.util.Preconditions.checkArgument(Preconditions.java:69)
        com.google.api.client.json.JsonParser.parseValue(JsonParser.java:723)
        com.google.api.client.json.JsonParser.parse(JsonParser.java:382)
        com.google.api.client.json.JsonParser.parse(JsonParser.java:336)
        com.google.api.client.json.JsonParser.parse(JsonParser.java:312)
        com.google.api.client.json.JsonFactory.fromString(JsonFactory.java:187)
        org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.fromJsonString(BigQueryHelpers.java:156)
        org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers$ConstantSchemaDestinations.getSchema(DynamicDestinationsHelpers.java:163)
        org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers$ConstantSchemaDestinations.getSchema(DynamicDestinationsHelpers.java:150)
        org.apache.beam.sdk.io.gcp.bigquery.CreateTables$1.processElement(CreateTables.java:103)
        org.apache.beam.sdk.io.gcp.bigquery.CreateTables$1$DoFnInvoker.invokeProcessElement(Unknown Source)
        org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177)
        org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:141)
        com.google.cloud.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:233)
        com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:48)
        com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:52)
        com.google.cloud.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:183)
        org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:211)
        org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:66)
        org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:436)
        org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:424)
        org.apache.beam.sdk.io.gcp.bigquery.PrepareWrite$1.processElement(PrepareWrite.java:62)
        org.apache.beam.sdk.io.gcp.bigquery.PrepareWrite$1$DoFnInvoker.invokeProcessElement(Unknown Source)
        .....
Run Code Online (Sandbox Code Playgroud)

似乎在管道创建/序列化时正确读取了 JSON,但在执行时,反序列化的 JSON 表示被传递来代替 JSON 字符串。我通过 GuavaResources类读取资源文件来生成 JSON 字符串:

String jsonSchema;
try {
    jsonSchema = Resources.toString(Resources.getResource("path_to_json_schema"), Charsets.UTF_8);
} catch (IOException e) {
    throw new RuntimeException("Failed to load JSON schema", e);
}
Run Code Online (Sandbox Code Playgroud)

如何解决此序列化问题?

jkf*_*kff 6

查看引发异常的代码,这似乎是 JSON 解析失败 - 您的 JSON 架构很可能格式错误。根据文档,它应该是这样的:

{
  "fields": [
    {
      "name": string,
      "type": string,
      "mode": string,
      "fields": [
        (TableFieldSchema)
      ],
      "description": string
    }
  ]
}
Run Code Online (Sandbox Code Playgroud)

例如:

{
  "fields": [
    {
      "name": "foo",
      "type": "INTEGER"
    },
    {
      "name": "bar",
      "type": "STRING",
    }
  ]
}
Run Code Online (Sandbox Code Playgroud)

查看失败的 JSON 解析器的代码,我怀疑您缺少外部{"fields": ...}并且您的 JSON 字符串仅包含[...].