Kafka 0.11如何重置偏移量

Cia*_*zka 30 apache-kafka

我正在尝试使用最新的Kafka CLI工具重置消费者偏移.

kafka-consumer-groups.bat --bootstrap-server kafka-host:9092 --group my-group --reset-offsets --to-earliest --all-topics
Run Code Online (Sandbox Code Playgroud)

结果我看到了这个输出:

TOPIC                            PARTITION  NEW-OFFSET
FirstTopic                       0          0
SecondTopic                      0          0
Run Code Online (Sandbox Code Playgroud)

但再次运行命令:

kafka-consumer-groups.bat --bootstrap-server kafka-host:9092 --group my-group --describe
Run Code Online (Sandbox Code Playgroud)

结果输出:

Consumer group 'my-group' has no active members.

TOPIC              PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
FirstTopic         0          1230            1230            0  
SecondTopic        0          1022            1022            0
Run Code Online (Sandbox Code Playgroud)

我尝试过其他选项,例如重置为显式偏移或直接指定主题但结果相同.输出表明操作成功,同时使用describe命令检查偏移或调试显示偏移未被更改.

任何人都可以在非动物园管理员经纪人中重置消费者抵消.

Mic*_*son 57

默认情况下,--reset-offsets只打印操作的结果.要实际执行您需要添加--execute到命令的操作:

kafka-consumer-groups.bat --bootstrap-server kafka-host:9092 --group
my-group --reset-offsets --to-earliest --all-topics --execute
Run Code Online (Sandbox Code Playgroud)

  • 重置为特定偏移的任何其他选项,例如上述问题中的FirstTopic为1200?具体来说,我希望我的消费者能够重置到特定的时间点(无论当时的偏移量是多少) (3认同)
  • 我也想知道如何通过Kafka Java API完成这些操作。 (2认同)

aru*_*elu 24

尽管接受的答案完美地回答了 OP 问题,但仍有更多参数可用于重置偏移量。因此,添加此答案以扩展已接受的答案。

将所有主题的偏移量重置为消费者组中最早的

kafka-consumer-groups.sh --bootstrap-server <kafka_broker_host:9091> --group
    <group_name> --reset-offsets --to-earliest --all-topics --execute
Run Code Online (Sandbox Code Playgroud)

将特定主题的偏移量重置为消费者组中最早的

kafka-consumer-groups.sh --bootstrap-server <kafka_broker_host:9091> --group
    <group_name> --reset-offsets --to-earliest --topic <my-topic> --execute
Run Code Online (Sandbox Code Playgroud)

将特定主题的偏移量重置为消费者组中的特定偏移量

kafka-consumer-groups.sh --bootstrap-server <kafka_broker_host:9091> --group
    <group_name> --reset-offsets --to-offset 1000 --topic <my-topic> --execute
Run Code Online (Sandbox Code Playgroud)

其他支持的论点:

--shift-by [正或负整数] - 从给定整数向前或向后移动偏移量。

--to-current--to-latest--to-offset--to-earliest 相同

--to-datetime [日期时间格式为yyyy-MM-ddTHH:mm:ss.xxx ]

kafka-consumer-groups.sh --bootstrap-server <kafka_broker_host:9091> --group
    <group_name> --reset-offsets --to-datetime 2017-08-04T00:00:00.000 [ --all-topics or --topic <topic-name> ] --execute
Run Code Online (Sandbox Code Playgroud)

--by -duration [格式为PnDTnHnMnS ]

kafka-consumer-groups.sh --bootstrap-server <kafka_broker_host:9091> --group
    <group_name> --reset-offsets --by-duration PT0H10M0S [ --all-topics or --topic <topic-name> ] --execute
Run Code Online (Sandbox Code Playgroud)

重置为按当前时间戳的持续时间偏移。

如何验证?

使用下面的命令来检查当前/偏移量的结束并确认重置是否进行了更改。

kafka-consumer-groups.sh --bootstrap-server <kafka_host:port> --group <group_id> --describe
Run Code Online (Sandbox Code Playgroud)

示例输出:

Consumer group 'group1' has no active members.

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
intro           0          0               99              99              -               -               -
Run Code Online (Sandbox Code Playgroud)

可能的错误:

错误:只有当组“[group_name]”处于非活动状态,但当前状态为稳定时,才能重置分配。

“稳定”意味着有一个活跃的消费者在为这个组运行。所以首先你必须停止活动的消费者并重试重置偏移量。

如果消费者组有活动的消费者,则无法重置偏移量