如何获取,设置或重置Kafka Connect连接器/任务/接收器的偏移量?
我可以使用/usr/bin/kafka-consumer-groups运行的工具kafka.admin.ConsumerGroupCommand来查看我所有常规Kafka消费者群体的偏移量.但是,Kafka Connect任务和组不会显示此工具.
同样,我可以使用zookeeper-shell连接到Zookeeper,我可以看到常规Kafka使用者组的zookeeper条目,但不能查看Kafka Connect接收器的条目.
Ewe*_*ava 11
从0.10.0.0开始,Connect不提供用于管理偏移的API.这是我们希望将来改进的东西,但还没有.这ConsumerGroupCommand将是管理Sink连接器偏移的正确工具.需要注意的是源连接器偏移存储在连接一个特殊的偏移主题(因为它们是由源系统定义他们不喜欢卡夫卡正常偏移,看offset.storage.topic在工作人员的配置文件),并自沉连接器采用了新的消费者,他们赢了将它们的偏移存储在Zookeeper中 - 所有现代客户端都使用基于Kafka的本地偏移存储.该ConsumerGroupCommand可以用这些偏移工作,你只需要通过--new-consumer选项).
您无法设置偏移量,但可以使用kafka-consumer-groups.sh工具“滚动”前馈。
您的连接器的使用者组的名称为connect-*CONNECTOR NAME*,但您可以仔细检查:
unset JMX_PORT; ./bin/kafka-consumer-groups.sh --bootstrap-server *KAFKA HOSTS* --list
Run Code Online (Sandbox Code Playgroud)
查看当前偏移量:
unset JMX_PORT; ./bin/kafka-consumer-groups.sh --bootstrap-server *KAFKA HOSTS* --group connect-*CONNECTOR NAME* --describe
Run Code Online (Sandbox Code Playgroud)
向前移动偏移:
unset JMX_PORT; ./bin/kafka-console-consumer.sh --bootstrap-server *KAFKA HOSTS* --topic *TOPIC* --max-messages 10000 --consumer-property group.id=connect-*CONNECTOR NAME* > /dev/null
Run Code Online (Sandbox Code Playgroud)
我想您也可以通过首先使用--delete标志删除消费者组来向后移动偏移量。
不要忘记通过 Kafka Connect REST API 暂停和恢复连接器。
| 归档时间: |
|
| 查看次数: |
7694 次 |
| 最近记录: |