G.D*_*rov 3 java apache-kafka kafka-consumer-api
我正在尝试实现基本场景,从头开始重新阅读主题(至少 1 条消息),并且我面临意外的行为。
假设有 1 个分区主题恰好容纳 100 万条消息,1 个消费者的偏移量已在中间某处提交,没有活动的生产者。
首先我尝试过
consumer.subscribe(Collections.singletonList(topic));
consumer.seekToBeginning(Collections.emptySet());
consumer.poll(Duration.ofMillis(longTimeout)); //no loop to simplify
Run Code Online (Sandbox Code Playgroud)
这不起作用(没有轮询消息)。我读过这seekToBeginning是懒惰的(而且没关系),但事实证明,seekToBeginning根本没有影响,因为它需要已经分配的分区,只有第一次轮询才会发生这种情况。它应该在文档中描述吗,还是我错过了?
然后我尝试过
consumer.subscribe(Collections.singletonList(topic));
consumer.poll(Duration.ofMillis(assignTimeout));
consumer.seekToBeginning(Collections.emptySet());
consumer.poll(Duration.ofMillis(longTimeout));//no loop to simplify
Run Code Online (Sandbox Code Playgroud)
事实证明,这取决于assignTimeout. 这应该足以完成加入过程。该时间可能会有所不同,并且不可能依赖它。
然后我提供ConsumerRebalanceListener了
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
consumer.seekToBeginning(partitions);
}
Run Code Online (Sandbox Code Playgroud)
并单poll左。它似乎终于起作用了。
所以问题是:
seekToBeginning不是之后subscribe就没用了?应该记录下来吗?ConsumerRebalanceListener可靠吗?它是否保证在应用搜索之前不会轮询来自中间(提交偏移量)的消息?小智 5
对于第一个:
seek()您在问题中正确地提到了这一点,即或操作的先决条件seekToXXXX()是需要分配分区。直到我们加入消费者组后才会发生这种情况,并且只有当我们调用 时才会发生这种情况poll()。因此,seek()操作后无法立即进行subscribe()是预期的行为。
这实际上记录在卡夫卡的权威指南,第 4 章卡夫卡消费者,部分 - 使用特定偏移量的记录。
对于第二个问题:
是的,使用ConsumerRebalanceListener是可靠的,并且是 Kafka 权威指南推荐的方法。
以下是同一章中的声明,证实了这一点:
有很多不同的方法来实现一次语义......................,但所有这些方法都需要使用 ConsumerRebalance Listener 和eek()来确保偏移量及时存储,并且消费者开始从正确的位置阅读消息。
希望这可以帮助!
| 归档时间: |
|
| 查看次数: |
3173 次 |
| 最近记录: |