如何在 Kafka Consumer Group 中将偏移量重置为任意值?

mzl*_*zlo 8 apache-kafka

我想将所有分区的偏移量重置为特定值......我看到 kafka-consumer-groups.sh 提供了 --from -file 将偏移量重置为 CSV 文件中定义的值的选项

任何人都可以分享这个 csv 文件的内容/格式和它的示例命令吗?

例如: ./kafka_2.12-2.1.1/bin/kafka-consumer-groups.sh --bootstrap-server ${KAFKA_BROKER} --group ${GROUP_NAME} --topic ${TOPIC} --reset-offsets --from-file offsets.csv --execute

offsets.csv 的内容/格式是什么?

war*_*iak 8

csv文件格式为(每一行包含一个分区的信息):

topicName,partitionNumber,offset
topicName,partitionNumber,offset
Run Code Online (Sandbox Code Playgroud)

示例 csv 内容 ( reset-policy.csv)。

someTopic1,0,1
someTopic2,1,5
Run Code Online (Sandbox Code Playgroud)

基于 csv 文件重置偏移量的命令是:

./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group gr1 --from-file reset-policy.csv --reset-offsets --execute


Ram*_*man 6

@wardzinski 的回答是所要求的关键信息,但我可以添加以下有用的花絮:

您可以使用 的--export命令kafka-consumer-groups从现有信息创建 CSV 文件,而无需通过--dry-run. 例如:

bin/kafka-consumer-groups \
  --bootstrap-server $KAFKA \
  --export --group $GROUP_NAME --topic $TOPIC \
  --reset-offsets --to-current \
  --dry-run
Run Code Online (Sandbox Code Playgroud)

的值--to-current可以被改变为各种其他值,诸如--to-datetime--by-period

该命令的输出是--from-file.

一个非常有用的用例是将偏移量从一个消费者组复制到另一个消费者组,例如:

bin/kafka-consumer-groups \
  --bootstrap-server $KAFKA \
  --export --group $FROM_GROUP_NAME --topic $TOPIC \
  --reset-offsets --to-current \
  --dry-run > offsets.txt

bin/kafka-consumer-groups \
  --bootstrap-server $KAFKA \
  --execute --group $TO_GROUP_NAME \
  --reset-offsets --from-file offsets.txt
Run Code Online (Sandbox Code Playgroud)