Sye*_*yun 4 apache-kafka kafka-consumer-api
我的 kafka 主题总共有 10 条记录/消息,2 个分区,每个分区有 5 条消息。我的消费者组有 2 个消费者,每个消费者已经分别从他们分配的分区中读取了 5 条消息。现在,我想从开始/开始(偏移量 0)重新处理/读取来自我的主题的消息。
我停止了我的 kafka 消费者并运行以下命令将消费者组偏移量重置为 0。
./kafka-consumer-groups.sh --group cg1 --reset-offsets --to-offset 0 --topic t1 --execute --bootstrap-server "..."
Run Code Online (Sandbox Code Playgroud)
我的期望是,一旦我重新启动我的 kafka 消费者,他们将开始从偏移量 0 即开始读取记录,但这并没有发生,他们从最后一个位置(即偏移量 5)进行轮询。为什么会这样?然后我必须让我的每个消费者明确地寻求偏移 0(开始)以从头开始重新处理/读取记录。在后来的测试周期中,我什至没有运行上面的命令来重置 kafka 消费者组的偏移量。
我的问题是,如果我必须让我的消费者明确地寻求开始让他们再次重新处理/读取消息,那么重置 kafka 消费者组的偏移量的目的是什么?
处理 Kafka 消费者偏移量有点棘手。消费者程序仅在使用的消费者组没有在内部 Kafka 主题中提交有效偏移量时使用auto.offset.reset配置。(其他支持的偏移存储是 Zookeeper,但内部 Kafka 主题在最新的 Kafka 版本中用作偏移存储)。
考虑以下场景:
名为 'group1' 的消费者组中的消费者已经消费了来自主题 'testtopic' 的 5 条消息,并且偏移细节被提交到内部 Kafka 主题 - 下次消费者启动时,它不会使用 ' auto.offset.reset ' 配置。相反,它将从存储中获取存储的偏移量,并继续从检索到的偏移量中获取消息。
名为“group2”的消费者组中的消费者作为新消费者启动,以从“testtopic”获取消息。这是一个新组,内部 Kafka 主题中没有可用的偏移详细信息 -现在使用“ auto.offset.reset ”配置来决定从哪里开始;无论是从主题的开头还是从最新的(只会消耗新消息)。
根据您的问题,问题是重置偏移量的命令不起作用,您必须手动寻找开始和启动消费者。
kafka-consumer-groups.sh --bootstrap-server <kafka_host:port> --group <group_id> [--topic <topic_name> or --all-topics] --reset-offsets [--to-earliest or --to-offset <offset>] --execute
Run Code Online (Sandbox Code Playgroud)
重置命令不起作用有三种可能。
从你的问题来看,第一种和第三种情况不太可能。请检查第二种情况。停止任何正在运行的消费者实例,然后尝试重置偏移量。
下面的命令可用于检查消费者组是否有活动的消费者实例。
kafka-consumer-groups.sh --bootstrap-server <kafka_host:port> --group <group_id> --describe
Run Code Online (Sandbox Code Playgroud)
示例输出:
Consumer group 'group1' has no active members.
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
intro 0 0 99 99
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
4684 次 |
| 最近记录: |