Apache Kafka:查找和分配。从头开始可靠阅读

G.D*_*rov 3 java apache-kafka kafka-consumer-api

我正在尝试实现基本场景,从头开始重新阅读主题(至少 1 条消息),并且我面临意外的行为。

假设有 1 个分区主题恰好容纳 100 万条消息,1 个消费者的偏移量已在中间某处提交,没有活动的生产者。

首先我尝试过

  consumer.subscribe(Collections.singletonList(topic));
  consumer.seekToBeginning(Collections.emptySet());
  consumer.poll(Duration.ofMillis(longTimeout)); //no loop to simplify
Run Code Online (Sandbox Code Playgroud)

这不起作用(没有轮询消息)。我读过这seekToBeginning是懒惰的(而且没关系),但事实证明,seekToBeginning根本没有影响,因为它需要已经分配的分区,只有第一次轮询才会发生这种情况。它应该在文档中描述吗,还是我错过了?

然后我尝试过

  consumer.subscribe(Collections.singletonList(topic));
  consumer.poll(Duration.ofMillis(assignTimeout));
  consumer.seekToBeginning(Collections.emptySet());
  consumer.poll(Duration.ofMillis(longTimeout));//no loop to simplify
Run Code Online (Sandbox Code Playgroud)

事实证明,这取决于assignTimeout. 这应该足以完成加入过程。该时间可能会有所不同,并且不可能依赖它。

然后我提供ConsumerRebalanceListener

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
      consumer.seekToBeginning(partitions);
    }
Run Code Online (Sandbox Code Playgroud)

并单poll左。它似乎终于起作用了。

所以问题是:

  1. seekToBeginning不是之后subscribe就没用了?应该记录下来吗?
  2. 解决方案ConsumerRebalanceListener可靠吗?它是否保证在应用搜索之前不会轮询来自中间(提交偏移量)的消息?

小智 5

对于第一个

seek()您在问题中正确地提到了这一点,即或操作的先决条件seekToXXXX()是需要分配分区。直到我们加入消费者组后才会发生这种情况,并且只有当我们调用 时才会发生这种情况poll()。因此,seek()操作后无法立即进行subscribe()是预期的行为。

这实际上记录在卡夫卡的权威指南,第 4 章卡夫卡消费者,部分 - 使用特定偏移量的记录。

对于第二个问题

是的,使用ConsumerRebalanceListener是可靠的,并且是 Kafka 权威指南推荐的方法。

以下是同一章中的声明,证实了这一点:

有很多不同的方法来实现一次语义......................,但所有这些方法都需要使用 ConsumerRebalance Listener 和eek()来确保偏移量及时存储,并且消费者开始从正确的位置阅读消息。

希望这可以帮助!