Google Cloud - Pub/Sub into DataFlow

Joh*_*own 0 json google-bigquery google-cloud-platform google-cloud-pubsub google-cloud-dataflow

我通过REST请求调用Pub/Sub.我正在尝试将列化数据放在Pub/Sub上的主题上,然后进入DataFlow,最后进入Big Query,其中定义了Table.

这是所述JSON数据的布局:

[
  {
    "age": "58",
    "job": "management",
    "marital": "married",
    "education": "tertiary",
    "default": "no",
    "balance": "2143",
    "housing": "yes",
    "loan": "no",
    "contact": "unknown",
    "day": "5",
    "month": "may",
    "duration": "261",
    "campaign": "1",
    "pdays": "-1",
    "previous": "0",
    "poutcome": "unknown",
    "y": "no"
    }
]
Run Code Online (Sandbox Code Playgroud)

现在,为了形成正确的JSON主体,需要进入以下Pub/Sub识别请求:

{
    "messages": [{
        "attributes": {
            "key": "iana.org/language_tag",
            "value": "en"
        },
        "data": "%DATA%"
    }]
}
Run Code Online (Sandbox Code Playgroud)

现在,Pub/Sub REST引用声明需要将"Data"字段转换为Base64,这就是我所做的,最终的JSON格式如下所示(%DATA%被原始消息的Base64转换替换)数据)

{
    "messages": [{
        "attributes": {
            "key": "iana.org/language_tag",
            "value": "en"
        },
        "data": "Ww0KICB7DQogICAgImFnZSI6ICI1OCIsDQogICAgImpvYiI6ICJtYW5hZ2VtZW50IiwNCiAgICAibWFyaXRhbCI6ICJtYXJyaWVkIiwNCiAgICAiZWR1Y2F0aW9uIjogInRlcnRpYXJ5IiwNCiAgICAiZGVmYXVsdCI6ICJubyIsDQogICAgImJhbGFuY2UiOiAiMjE0MyIsDQogICAgImhvdXNpbmciOiAieWVzIiwNCiAgICAibG9hbiI6ICJubyIsDQogICAgImNvbnRhY3QiOiAidW5rbm93biIsDQogICAgImRheSI6ICI1IiwNCiAgICAibW9udGgiOiAibWF5IiwNCiAgICAiZHVyYXRpb24iOiAiMjYxIiwNCiAgICAiY2FtcGFpZ24iOiAiMSIsDQogICAgInBkYXlzIjogIi0xIiwNCiAgICAicHJldmlvdXMiOiAiMCIsDQogICAgInBvdXRjb21lIjogInVua25vd24iLA0KICAgICJ5IjogIm5vIg0KICAgIH0NCl0="
    }]
}
Run Code Online (Sandbox Code Playgroud)

Pub/Sub允许这些数据,然后将其放入DataFlow,但这就是一切都中断的地方.DataFlow尝试反序列化信息,但失败并显示以下消息:

(efdf538fc01f50b0): java.lang.RuntimeException: Unable to parse input
        com.google.cloud.teleport.templates.common.BigQueryConverters$JsonToTableRow$1.apply(BigQueryConverters.java:58)
        com.google.cloud.teleport.templates.common.BigQueryConverters$JsonToTableRow$1.apply(BigQueryConverters.java:47)
        org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:122)
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Can not deserialize instance of com.google.api.services.bigquery.model.TableRow out of START_ARRAY token
 at [Source: [{"age":"32","job":"\"admin.\"","marital":"\"single\"","education":"\"secondary\"","default":"\"no\"","balance":"5","housing":"\"yes\"","loan":"\"no\"","contact":"\"unknown\"","day":"12","month":"\"may\"","duration":"593","campaign":"2","pdays":"-1","previous":"0","poutcome":"\"unknown\"","y":"\"no\""}]; line: 1, column: 1]
Run Code Online (Sandbox Code Playgroud)

我认为这与"data":字段的格式化有关,但我尝试了其他方法,但我无法得到任何工作.

Joh*_*own 5

经过进一步的实验,问题确实是如何格式化JSON.通过删除开放[和关闭]DataFlow确实能够识别数据然后将其放入BigQuery.