再次重新处理/读取 Kafka 记录/消息 - 消费者组偏移重置的目的是什么?

Sye*_*yun 4 apache-kafka kafka-consumer-api

我的 kafka 主题总共有 10 条记录/消息,2 个分区,每个分区有 5 条消息。我的消费者组有 2 个消费者,每个消费者已经分别从他们分配的分区中读取了 5 条消息。现在,我想从开始/开始(偏移量 0)重新处理/读取来自我的主题的消息。

我停止了我的 kafka 消费者并运行以下命令将消费者组偏移量重置为 0。

./kafka-consumer-groups.sh --group cg1 --reset-offsets --to-offset 0 --topic t1 --execute --bootstrap-server "..."
Run Code Online (Sandbox Code Playgroud)

我的期望是,一旦我重新启动我的 kafka 消费者,他们将开始从偏移量 0 即开始读取记录,但这并没有发生,他们从最后一个位置(即偏移量 5)进行轮询。为什么会这样?然后我必须让我的每个消费者明确地寻求偏移 0(开始)以从头开始重新处理/读取记录。在后来的测试周期中,我什至没有运行上面的命令来重置 kafka 消费者组的偏移量。

我的问题是,如果我必须让我的消费者明确地寻求开始让他们再次重新处理/读取消息,那么重置 kafka 消费者组的偏移量的目的是什么?

aru*_*elu 5

处理 Kafka 消费者偏移量有点棘手。消费者程序仅在使用的消费者组没有在内部 Kafka 主题中提交有效偏移量时使用auto.offset.reset配置。(其他支持的偏移存储是 Zookeeper,但内部 Kafka 主题在最新的 Kafka 版本中用作偏移存储)。

考虑以下场景:

  1. 名为 'group1' 的消费者组中的消费者已经消费了来自主题 'testtopic' 的 5 条消息,并且偏移细节被提交到内部 Kafka 主题 - 下次消费者启动时,它不会使用 ' auto.offset.reset ' 配置。相反,它将从存储中获取存储的偏移量,并继续从检索到的偏移量中获取消息。

  2. 名为“group2”的消费者组中的消费者作为新消费者启动,以从“testtopic”获取消息。这是一个新组,内部 Kafka 主题中没有可用的偏移详细信息 -现在使用“ auto.offset.reset ”配置来决定从哪里开始;无论是从主题的开头还是从最新的(只会消耗新消息)。

根据您的问题,问题是重置偏移量的命令不起作用,您必须手动寻找开始和启动消费者。

kafka-consumer-groups.sh --bootstrap-server <kafka_host:port> --group <group_id> [--topic <topic_name> or --all-topics] --reset-offsets [--to-earliest or --to-offset <offset>] --execute
Run Code Online (Sandbox Code Playgroud)

重置命令不起作用有三种可能。

  1. 日志保留期更短,您尝试重置的偏移量不再可用
  2. 消费者组中的消费者实例正在运行。在这两种情况下,reset offset 命令可能都不起作用。
  3. Kafka 版本 <0.11。重置偏移 API 仅从 Kafka 0.11 可用

从你的问题来看,第一种和第三种情况不太可能。请检查第二种情况。停止任何正在运行的消费者实例,然后尝试重置偏移量。

下面的命令可用于检查消费者组是否有活动的消费者实例。

kafka-consumer-groups.sh --bootstrap-server <kafka_host:port> --group <group_id> --describe
Run Code Online (Sandbox Code Playgroud)

示例输出:

Consumer group 'group1' has no active members.

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
intro           0          0               99              99 
Run Code Online (Sandbox Code Playgroud)