小编San*_*yak的帖子

Kafka-confluent:如何在 JDBC 接收器连接器中使用 pk.mode=record_key 进行更新插入和删除模式?

在 Kafka confluent 中,我们如何在使用pk.mode=record_keyMySQL 表中的复合键的同时使用源作为 CSV 文件的 upsert ?使用pk.mode=record_values. 是否有任何额外的配置需要完成?

如果我尝试使用pk.mode=record_key. 错误 - 由以下原因引起org.apache.kafka.connect.errors.ConnectException::需要定义一个 PK 列,因为记录的关键架构是一种原始类型。以下是我的 JDBC 接收器连接器配置:

    {
    "name": "<name>",
    "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "topics": "<topic name>",
    "connection.url": "<url>",
    "connection.user": "<user name>",
    "connection.password": "*******",
    "insert.mode": "upsert",
    "batch.size": "50000",
    "table.name.format": "<table name>",
    "pk.mode": "record_key",
    "pk.fields": "field1,field2",
    "auto.create": "true",
    "auto.evolve": "true",
    "max.retries": "10",
    "retry.backoff.ms": "3000",
    "mode": "bulk",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schemas.enable": "true",
    "value.converter.schema.registry.url": "http://localhost:8081"
  }
}
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-kafka-connect confluent-platform

1
推荐指数
1
解决办法
2793
查看次数