Kafka Connect - 无法刷新,在等待生产者刷新未完成的消息时超时

Dav*_*vid 15 apache-kafka kafka-producer-api apache-kafka-connect

我正在尝试在 BULK 模式下使用具有以下属性的 Kafka Connect JDBC Source Connector。

connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
timestamp.column.name=timestamp
connection.password=XXXXX
validate.non.null=false
tasks.max=1
producer.buffer.memory=2097152
batch.size=1000
producer.enable.idempotence=true
offset.flush.timeout.ms=300000
table.types=TABLE,VIEW
table.whitelist=materials
offset.flush.interval.ms=5000
mode=bulk
topic.prefix=mysql-
connection.user=kafka_connect_user
poll.interval.ms=200000
connection.url=jdbc:mysql://<DBNAME>
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
Run Code Online (Sandbox Code Playgroud)

我收到有关提交偏移量的以下错误,更改各种参数似乎影响不大。

[2019-04-04 12:42:14,886] INFO WorkerSourceTask{id=SapMaterialsConnector-0} flushing 4064 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2019-04-04 12:42:19,886] ERROR WorkerSourceTask{id=SapMaterialsConnector-0} Failed to flush, timed out while waiting for producer to flush outstanding 712 messages (org.apache.kafka.connect.runtime.WorkerSourceTask)
Run Code Online (Sandbox Code Playgroud)

Gio*_*ous 11

该错误表示有很多消息被缓冲,在超时之前无法刷新。


要解决此问题,您可以

  • offset.flush.timeout.ms在您的 Kafka Connect Worker Configs 中增加配置参数
  • 或者您可以通过减少producer.buffer.memoryKafka Connect Worker 配置来减少缓冲的数据量。当您有相当大的消息时,这将成为最佳选择。