使用 Debezium 提取密钥的转换中不存在字段

4it*_*med 5 apache-kafka apache-kafka-connect debezium ksqldb

我正在尝试创建一个Debezium MySQL连接器,并进行转换以提取密钥。

关键转换之前:

create source connector mysql with(
    "connector.class" = 'io.debezium.connector.mysql.MySqlConnector',
    "database.hostname" = 'mysql',
    "tasks.max" = '1',
    "database.port" = '3306',
    "database.user" = 'debezium',
    "database.password" = 'dbz',
    "database.server.id" = '42',
    "database.server.name" = 'before',
    "table.whitelist" = 'deepprices.deepprices',
    "database.history.kafka.bootstrap.servers" = 'kafka:29092',
    "database.history.kafka.topic" = 'dbz.deepprices',
    "include.schema.changes" = 'true',
    "transforms" = 'unwrap',
    "transforms.unwrap.type" = 'io.debezium.transforms.UnwrapFromEnvelope');
Run Code Online (Sandbox Code Playgroud)

主题结果是:

> rowtime: 2020/05/20 16:47:23.354 Z, key: [St@5778462697648631933/8247607644536792125], value: {"id": "P195910", "price": "1511.64"}
Run Code Online (Sandbox Code Playgroud)

当 key.converter 设置为JSON时,Key 变为{"id": "P195910"}

所以,我想从 key 中提取 id 并将其设为字符串键:

预期成绩 :

rowtime: 2020/05/20 16:47:23.354 Z, 
key: 'P195910', 
value: {"id": "P195910", "price": "1511.64"}   
Run Code Online (Sandbox Code Playgroud)

在尝试使用ExtractFieldor进行转换时ValueToKey,我得到:

DataException: Field does not exist: id

我尝试使用包含以下内容的指令ValueToKey

create source connector mysql with(
    "connector.class" = 'io.debezium.connector.mysql.MySqlConnector',
    "database.hostname" = 'mysql',
    "tasks.max" = '1',
    "database.port" = '3306',
    "database.user" = 'debezium',
    "database.password" = 'dbz',
    "database.server.id" = '42',
    "database.server.name" = 'after',
    "table.whitelist" = 'deepprices.deepprices',
    "database.history.kafka.bootstrap.servers" = 'kafka:29092',
    "database.history.kafka.topic" = 'dbz.deepprices',
    "include.schema.changes" = 'true',
    "key.converter" = 'org.apache.kafka.connect.json.JsonConverter',
    "key.converter.schemas.enable" = 'TRUE',
    "value.converter" = 'org.apache.kafka.connect.json.JsonConverter',
    "value.converter.schemas.enable" = 'TRUE',
    "transforms" = 'unwrap,createkey',
    "transforms.unwrap.type" = 'io.debezium.transforms.UnwrapFromEnvelope',
    "transforms.createkey.type" = 'org.apache.kafka.connect.transforms.ValueToKey',
    "transforms.createkey.fields" = 'id'
    );
Run Code Online (Sandbox Code Playgroud)

在我的Kafka-connect 日志中导致以下错误:

Caused by: org.apache.kafka.connect.errors.DataException: Field does not exist: id
        at org.apache.kafka.connect.transforms.ValueToKey.applyWithSchema(ValueToKey.java:89)
        at org.apache.kafka.connect.transforms.ValueToKey.apply(ValueToKey.java:67)
Run Code Online (Sandbox Code Playgroud)

4it*_*med 7

将转换类型从 更改为UnwrapFromEnvelopeExtractNewRecordState解决了 Debezium MySQL CDC Connector 版本 上的问题1.1.0

transforms.unwrap.type" = 'io.debezium.transforms.ExtractNewRecordState'
Run Code Online (Sandbox Code Playgroud)


Rob*_*att 0

由于您在这里使用 ksqlDB,因此您需要将源连接器设置为将密钥写入字符串:

key.converter=org.apache.kafka.connect.storage.StringConverter

  • UnwrapFromEnvelope 在 1.2.0.Debezium Alpha1 中已弃用并删除 (2认同)