如何设置JDBC源连接器(kafka)的key?

din*_*blk 4 apache-kafka-connect

我正在使用 a 从数据库表中读取数据Kafka Source JDBC connector并将其发布到主题test-mysql-petai

数据库表有 2 个字段,其中Id是主键:

+---------+-------------+------+-----+---------+----------------+
| Field   | Type        | Null | Key | Default | Extra          |
+---------+-------------+------+-----+---------+----------------+
| id      | int(11)     | NO   | PRI | NULL    | auto_increment |
| name    | varchar(20) | YES  |     | NULL    |                |
+---------+-------------+------+-----+---------+----------------+
Run Code Online (Sandbox Code Playgroud)

我需要该字段的值id作为主题的键。我尝试向 jdbc 连接器属性添加转换。

JDBCConnector.属性:

name=jdbc-source-connector    
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1   
connection.url=jdbc:mysql://127.0.0.1:3306/test?user=dins&password=pw&serverTimezone=UTC
table.whitelist=petai 
mode=incrementing
incrementing.column.name=id    
schema.pattern=""    
transforms=createKey,extractInt    
transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey    
transforms.createKey.fields=id    
transforms.extractInt.type=org.apache.kafka.connect.transforms.ExtractField$Key    
transforms.extractInt.field=id    
topic.prefix=test-mysql-jdbc-
Run Code Online (Sandbox Code Playgroud)

但是,当我使用消费者读取键和值时,我得到以下信息:

Key = {"schema":{"type":"int32","optional":false},"payload":61} 
Value ={"id":61,"name":"ttt"}
Run Code Online (Sandbox Code Playgroud)

我需要得到以下信息:

Key = 61    
Value ={"id":61,"name":"ttt"}
Run Code Online (Sandbox Code Playgroud)

我究竟做错了什么?任何帮助表示赞赏。

谢谢。

Isk*_*der 5

如果您不想在键中包含架构,您可以通过设置 告诉 Kafka Connect key.converter.schemas.enable=false

\n\n

有关详细说明,请参阅Robin Moffatt 的 Kafka Connect Deep Dive \xe2\x80\x93 转换器和序列化说明

\n