如何删除某个特定主题的一组消费者偏移量

Tob*_*ann 5 apache-kafka

假设我有两个主题(都有两个分区和无限保留):

  • my_topic_a
  • my_topic_b

和一个消费者群体:

  • my_consumer

在某些时候,它正在消耗这两个主题,但由于一些变化,它不再对 感兴趣my_topic_a,因此它停止消耗它,现在正在累积滞后:

kafka-consumer-groups.sh --bootstrap-server=kafka.core-kafka.svc.cluster.local:9092 --group my_consumer --describe
Run Code Online (Sandbox Code Playgroud)
TOPIC                                PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                  HOST            CLIENT-ID
my_topic_a                           0          300000          400000          100000          -                                                            -               -
my_topic_a                           1          300000          400000          100000          -                                                            -               -
my_topic_b                           0          500000          500000          0               -                                                            -               -
my_topic_b                           1          500000          500000          0               -                                                            -               -
Run Code Online (Sandbox Code Playgroud)

这种滞后让我很恼火,因为:

  • 我在 Grafana 中的消费者滞后图被污染了。
  • 自动警报被触发,提醒我消费者滞后太多。

因此,我想摆脱my_topic_aof的偏移量my_consumer,达到好像my_consumer从未消耗过的状态my_topic_a

以下尝试失败:

TOPIC                                PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                  HOST            CLIENT-ID
my_topic_a                           0          300000          400000          100000          -                                                            -               -
my_topic_a                           1          300000          400000          100000          -                                                            -               -
my_topic_b                           0          500000          500000          0               -                                                            -               -
my_topic_b                           1          500000          500000          0               -                                                            -               -
Run Code Online (Sandbox Code Playgroud)

有了这个输出:

The consumer does not support topic-specific offset deletion from a consumer group.
Run Code Online (Sandbox Code Playgroud)

我怎样才能实现我的目标?(在我的用例中,暂时停止该组的所有消费者将是一个可行的选择。)

(我使用的是 Kafka 版本2.2.0。)


我的猜测是,可以通过在 topic 上写一些东西来完成一些事情__consumer_offsets,但我不知道它会是什么。目前,该主题如下所示(再次简化):

kafka-consumer-groups.sh --bootstrap-server kafka:9092 --group my_consumer_group --delete --topic domain.user
Run Code Online (Sandbox Code Playgroud)
...
[my_consumer_group,my_topic_a,0]::OffsetAndMetadata(offset=299999, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1605000000000, expireTimestamp=None)
[my_consumer_group,my_topic_a,0]::OffsetAndMetadata(offset=300000, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1605000100000, expireTimestamp=None)
...
[my_consumer_group,my_topic_a,1]::OffsetAndMetadata(offset=299999, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1605000000000, expireTimestamp=None)
[my_consumer_group,my_topic_a,1]::OffsetAndMetadata(offset=300000, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1605000100000, expireTimestamp=None)
...
[my_consumer_group,my_topic_b,0]::OffsetAndMetadata(offset=499999, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1607000000000, expireTimestamp=None)
[my_consumer_group,my_topic_b,0]::OffsetAndMetadata(offset=500000, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1607000100000, expireTimestamp=None)
...
[my_consumer_group,my_topic_b,1]::OffsetAndMetadata(offset=499999, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1607000000000, expireTimestamp=None)
[my_consumer_group,my_topic_b,1]::OffsetAndMetadata(offset=500000, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1607000100000, expireTimestamp=None)
Run Code Online (Sandbox Code Playgroud)

Tob*_*ann 4

与此同时(Kafka 2.8),使用 的新--delete-offsets参数已经成为可能kafka-consumer-groups.sh。:-)