假设我有两个主题(都有两个分区和无限保留):
my_topic_amy_topic_b和一个消费者群体:
my_consumer在某些时候,它正在消耗这两个主题,但由于一些变化,它不再对 感兴趣my_topic_a,因此它停止消耗它,现在正在累积滞后:
kafka-consumer-groups.sh --bootstrap-server=kafka.core-kafka.svc.cluster.local:9092 --group my_consumer --describe
Run Code Online (Sandbox Code Playgroud)
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
my_topic_a 0 300000 400000 100000 - - -
my_topic_a 1 300000 400000 100000 - - -
my_topic_b 0 500000 500000 0 - - -
my_topic_b 1 500000 500000 0 - - -
Run Code Online (Sandbox Code Playgroud)
这种滞后让我很恼火,因为:
因此,我想摆脱my_topic_aof的偏移量my_consumer,达到好像my_consumer从未消耗过的状态my_topic_a。
以下尝试失败:
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
my_topic_a 0 300000 400000 100000 - - -
my_topic_a 1 300000 400000 100000 - - -
my_topic_b 0 500000 500000 0 - - -
my_topic_b 1 500000 500000 0 - - -
Run Code Online (Sandbox Code Playgroud)
有了这个输出:
The consumer does not support topic-specific offset deletion from a consumer group.
Run Code Online (Sandbox Code Playgroud)
我怎样才能实现我的目标?(在我的用例中,暂时停止该组的所有消费者将是一个可行的选择。)
(我使用的是 Kafka 版本2.2.0。)
我的猜测是,可以通过在 topic 上写一些东西来完成一些事情__consumer_offsets,但我不知道它会是什么。目前,该主题如下所示(再次简化):
kafka-consumer-groups.sh --bootstrap-server kafka:9092 --group my_consumer_group --delete --topic domain.user
Run Code Online (Sandbox Code Playgroud)
...
[my_consumer_group,my_topic_a,0]::OffsetAndMetadata(offset=299999, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1605000000000, expireTimestamp=None)
[my_consumer_group,my_topic_a,0]::OffsetAndMetadata(offset=300000, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1605000100000, expireTimestamp=None)
...
[my_consumer_group,my_topic_a,1]::OffsetAndMetadata(offset=299999, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1605000000000, expireTimestamp=None)
[my_consumer_group,my_topic_a,1]::OffsetAndMetadata(offset=300000, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1605000100000, expireTimestamp=None)
...
[my_consumer_group,my_topic_b,0]::OffsetAndMetadata(offset=499999, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1607000000000, expireTimestamp=None)
[my_consumer_group,my_topic_b,0]::OffsetAndMetadata(offset=500000, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1607000100000, expireTimestamp=None)
...
[my_consumer_group,my_topic_b,1]::OffsetAndMetadata(offset=499999, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1607000000000, expireTimestamp=None)
[my_consumer_group,my_topic_b,1]::OffsetAndMetadata(offset=500000, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1607000100000, expireTimestamp=None)
Run Code Online (Sandbox Code Playgroud)
与此同时(Kafka 2.8),使用 的新--delete-offsets参数已经成为可能kafka-consumer-groups.sh。:-)
| 归档时间: |
|
| 查看次数: |
7182 次 |
| 最近记录: |