小编bis*_*smi的帖子

Debezium Kafka CDC 连接器将密钥设置为 avro,即使转换器是 StringConverver

这是我的连接器配置:

curl -s -k -X POST http://***************:8083/connectors -H "Content-Type: application/json" -d '{
  "name": "mysql-cdc-CUSTOMER_DETAILS-007",
  "config": {
    "tasks.max":"2",
    "poll.interval.ms":"500",
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "dbnode",
    "database.port": "3306",
    "database.user": "**********",
    "database.password": "###########",
    "database.server.name": "dbnode",
    "database.whitelist": "device_details",
    "database.history.kafka.bootstrap.servers": "**********:9092",
    "database.history.kafka.topic": "schema-changes.device_details",
    "include.schema.changes":"true",
    "table.whitelist":"device_details.tb_customermst",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "key.converter.schemas.enable": "false",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://************:8081",
    "internal.key.converter":"org.apache.kafka.connect.json.JsonConverter",
    "internal.value.converter":"org.apache.kafka.connect.json.JsonConverter",
    "internal.key.converter.schemas.enable":"false",
    "internal.value.converter.schemas.enable":"false"
  }
}' | jq '.'
Run Code Online (Sandbox Code Playgroud)

从ksql消费数据时,显示如下:

ksql> print 'Device_Details.device_details.tb_customermst' from beginning;
Format:AVRO
5/2/20 2:08:34 PM IST, Struct{customerid=10001}, {"before": null, "after": {"customerid": 10001, "firstname": "Klara", "lastname": "Djokic", "emailid": "klara.djokic007@iillii.org", "mobilenumber": "+1 …
Run Code Online (Sandbox Code Playgroud)

mysql apache-kafka apache-kafka-connect debezium confluent-platform

2
推荐指数
1
解决办法
2935
查看次数