Kafka Connect Offsets.get/set方法?

cla*_*lay 11 apache-kafka

如何获取,设置或重置Kafka Connect连接器/任务/接收器的偏移量?

我可以使用/usr/bin/kafka-consumer-groups运行的工具kafka.admin.ConsumerGroupCommand来查看我所有常规Kafka消费者群体的偏移量.但是,Kafka Connect任务和组不会显示此工具.

同样,我可以使用zookeeper-shell连接到Zookeeper,我可以看到常规Kafka使用者组的zookeeper条目,但不能查看Kafka Connect接收器的条目.

Ewe*_*ava 11

从0.10.0.0开始,Connect不提供用于管理偏移的API.这是我们希望将来改进的东西,但还没有.这ConsumerGroupCommand将是管理Sink连接器偏移的正确工具.需要注意的是源连接器偏移存储在连接一个特殊的偏移主题(因为它们是由源系统定义他们不喜欢卡夫卡正常偏移,看offset.storage.topic工作人员的配置文件),并自沉连接器采用了新的消费者,他们赢了将它们的偏移存储在Zookeeper中 - 所有现代客户端都使用基于Kafka的本地偏移存储.该ConsumerGroupCommand可以用这些偏移工作,你只需要通过--new-consumer选项).


Pav*_*vel 5

您无法设置偏移量,但可以使用kafka-consumer-groups.sh工具“滚动”前馈。

您的连接器的使用者组的名称为connect-*CONNECTOR NAME*,但您可以仔细检查:

unset JMX_PORT; ./bin/kafka-consumer-groups.sh --bootstrap-server *KAFKA HOSTS* --list
Run Code Online (Sandbox Code Playgroud)

查看当前偏移量:

unset JMX_PORT; ./bin/kafka-consumer-groups.sh --bootstrap-server *KAFKA HOSTS* --group connect-*CONNECTOR NAME* --describe
Run Code Online (Sandbox Code Playgroud)

向前移动偏移:

unset JMX_PORT; ./bin/kafka-console-consumer.sh --bootstrap-server *KAFKA HOSTS* --topic *TOPIC* --max-messages 10000 --consumer-property group.id=connect-*CONNECTOR NAME* > /dev/null
Run Code Online (Sandbox Code Playgroud)

我想您也可以通过首先使用--delete标志删除消费者组来向后移动偏移量。

不要忘记通过 Kafka Connect REST API 暂停和恢复连接器。