小编4it*_*med的帖子

使用 Debezium 提取密钥的转换中不存在字段

我正在尝试创建一个Debezium MySQL连接器,并进行转换以提取密钥。

关键转换之前:

create source connector mysql with(
    "connector.class" = 'io.debezium.connector.mysql.MySqlConnector',
    "database.hostname" = 'mysql',
    "tasks.max" = '1',
    "database.port" = '3306',
    "database.user" = 'debezium',
    "database.password" = 'dbz',
    "database.server.id" = '42',
    "database.server.name" = 'before',
    "table.whitelist" = 'deepprices.deepprices',
    "database.history.kafka.bootstrap.servers" = 'kafka:29092',
    "database.history.kafka.topic" = 'dbz.deepprices',
    "include.schema.changes" = 'true',
    "transforms" = 'unwrap',
    "transforms.unwrap.type" = 'io.debezium.transforms.UnwrapFromEnvelope');
Run Code Online (Sandbox Code Playgroud)

主题结果是:

> rowtime: 2020/05/20 16:47:23.354 Z, key: [St@5778462697648631933/8247607644536792125], value: {"id": "P195910", "price": "1511.64"}
Run Code Online (Sandbox Code Playgroud)

当 key.converter 设置为JSON时,Key 变为{"id": "P195910"}

所以,我想从 key 中提取 id 并将其设为字符串键:

预期成绩 : …

apache-kafka apache-kafka-connect debezium ksqldb

5
推荐指数
2
解决办法
4584
查看次数