如何将没有模式的数据发送到 kafka - confluence jdbc - 接收器使用?

Ers*_*har 5 apache-kafka apache-kafka-connect confluent-platform

我使用 conluent jdbc-sink 将数据从 kafka 加载到 oracle。

但我用数据来写我的价值模式。

我不想用数据编写模式,如何在 kafka 主题上编写模式,然后我只想从客户端发送数据?

提前致谢

json数据

{
    "schema": {
        "type": "struct",
        "fields": [
            {
                "field": 'ID',
                "type": "int32",
                "optional": False
            },
            {
                "field": 'PRODUCT',
                "type": "string",
                "optional": True
            },
            {
                "field": 'QUANTITY',
                "type": "int32",
                "optional": True
            },
            {
                "field": 'PRICE',
                "type": "int32",
                "optional": True
            }
        ],
        "optional": True,
        "name": "myrecord"
    },
    "payload": {
        "ID": 1071,
        "PRODUCT": 'ersin',
        "QUANTITIY": 1071,
        "PRICE": 1453
   }
Run Code Online (Sandbox Code Playgroud)

蟒蛇代码:

producer.send(topic, key=b'1071'
              , value=json.dumps(v, default=json_util.default).encode('utf-8'))
Run Code Online (Sandbox Code Playgroud)

我该如何解决这个问题?

提前致谢

Gio*_*ous 5

如果要使用 JDBC 接收器连接器,则必须提供架构。这可以通过三种方式实现:

  • 在启用架构的情况下使用 JSON
  • 使用 Avro 和架构注册表
  • 将 JSON 架构与架构注册表结合使用

您当前正在使用启用了架构的 JSON,这需要将架构与实际负载一起发送。实现您的要求的唯一方法是使用 Avro 和 Confluence Schema Registry,以便您的架构在架构注册表中注册。这样,您就不需要每次都发送有效负载模式。

另一种选择是使用带有架构注册表的 JSON ( #1289 )。对于 Kafka Connect,您可以使用JsonSchemaConverter,对于 Java 消费者和生产者,您可以使用KafkaJsonSchemaSerializerKafkaJsonSchemaDeserializer