Kafka Connect - JDBC 源连接器 - 设置 Avro 架构

use*_*ist 5 avro apache-kafka apache-kafka-connect confluent-schema-registry

如何使 Kafka Connect JDBC 连接器连接到预定义的 Avro 架构?它在创建连接器时创建一个新版本。我正在阅读 DB2 并放入 Kafka 主题。我在创建过程中设置架构名称和版本,但它不起作用!这是我的连接器设置:

    {
      “名称”:“kafka-connect-jdbc-db2-tst-2”,
      “配置”:{
        "connector.class": "io.confluence.connect.jdbc.JdbcSourceConnector",
        "tasks.max": "1",
        "connection.url": "jdbc:db2://mydb2:50000/testdb",
        "连接.用户": "DB2INST1",
        "连接密码": "12345678",
        “查询”:“从TEST.MYVIEW4中选择CORRELATION_ID”,
        “模式”:“递增”,
        "incrementing.column.name": "CORRELATION_ID",
        "validate.non.null": "假",
        "topic.prefix": "tst-4" ,
        "auto.register.schemas": "假",
        "use.latest.version": "true",
        "transforms": "RenameField,SetSchemaMetadata",
        "transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
        "transforms.RenameField.renames": "CORRELATION_ID:id",
        "transforms.SetSchemaMetadata.type": "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
        "transforms.SetSchemaMetadata.schema.name": "foo.bar.MyMessage",
        "transforms.SetSchemaMetadata.schema.version": "1"
    
      }
    
    }

以下是架构:V.1 是我的,V.2 是由 JDBC 源连接器创建的:

    $curl 本地主机:8081/subjects/tst-4-value/versions/1 | jq .
    
    {
      “主题”:“tst-4-值”,
      “版本”:1,
      “ID”:387,
      "schema": "{"type":"record","name":"MyMessage",
    "namespace":"foo.bar","fields":[{"name":"id","type":"int"}]}"
    }
    
    $curl 本地主机:8081/subjects/tst-4-value/versions/2 | jq .
    {
      “主题”:“tst-4-值”,
      “版本”:2,
      “ID”:386,
      "schema": "{"type":"record","name":"MyMessage","namespace":"foo.bar",
       "fields":[{"name":"id","type":"int"}],
       “连接版本”:1,
       "connect.name":"foo.bar.MyMessage"
    }”
    }

知道如何强制 Kafka 连接器使用我的架构吗?提前致谢,