重新启动Kafka Connect S3接收器任务后,它重新开始从主题开头一直写入并写入旧记录的重复副本.换句话说,Kafka Connect似乎失去了它的位置.
所以,我想Kafka Connect将当前的偏移位置信息存储在内部connect-offsets主题中.那个话题是空的,我认为这是问题的一部分.
另外两个内部主题connect-statuses和connect-configs不为空.connect-statuses有52个条目.connect-configs有6个条目; 三为每两个宿连接器的我已经配置:connector-<name>,task-<name>-0,commit-<name>.
在运行此文档之前,我手动创建了文档中指定的内部Kafka Connect主题:
/usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-configs --replication-factor 3 --partitions 1 --config cleanup.policy=compact
/usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-offsets --replication-factor 3 --partitions 50 --config cleanup.policy=compact
/usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-statuses --replication-factor 3 --partitions 10 --config cleanup.policy=compact
Run Code Online (Sandbox Code Playgroud)
我可以验证connect-offsets主题似乎是正确创建的:
/usr/bin/kafka-topics --zookeeper localhost:2181 --describe --topic connect-offsets
Topic:connect-offsets PartitionCount:50 ReplicationFactor:3 Configs:cleanup.policy=compact
Topic: connect-offsets Partition: 0 Leader: …Run Code Online (Sandbox Code Playgroud)