具有JSON模式的Kafka JDBC Sink连接器不起作用

pra*_*has 2 json jdbc apache-kafka apache-kafka-connect

使用最新的kafka和融合的jdbc接收器连接器。发送一个非常简单的Json消息:

{
    "schema": {
        "type": "struct",
        "fields": [
            {
                "type": "int",
                "optional": false,
                "field": "id"
            },
            {
                "type": "string",
                "optional": true,
                "field": "msg"
            }
        ],
        "optional": false,
        "name": "msgschema"
    },
    "payload": {
        "id": 222,
        "msg": "hi"
    }
}
Run Code Online (Sandbox Code Playgroud)

但是出现错误:

org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
Run Code Online (Sandbox Code Playgroud)

杰森林特说杰森是有效的。我将json保留schemas.enable=true在kafka配置中。有指针吗?

Rob*_*att 5

您需要告诉Connect您的架构嵌入您正在使用的JSON中。

你有:

value.converter=org.apache.kafka.connect.json.JsonConverter 
Run Code Online (Sandbox Code Playgroud)

但还需要:

value.converter.schemas.enable=true
Run Code Online (Sandbox Code Playgroud)