为什么对 Kafka 的 seekToBeginning 和 seekToEnd API 的调用永远挂起?

Nir*_*jan 3 java apache-kafka

我的环境:

  • 卡夫卡版本: kafka_2.10-0.10.0.0
  • Kafka Java API 版本(客户端): kafka-clients-0.10.0.0

我的配置:

  • 话题: event_notification
  • 分区: 20
  • 客户端消费者线程: 1
  • 消费者组标识: event_cg01
  • 自动提交标志: false

按照要求,我的应用程序,基于标记的启动过程中,我必须设置偏移要么开始结束。为此,我使用以下代码:

final List<PartitionInfo> partitionsInfos = kafkaConsumer.partitionsFor(this.topic);

final List<TopicPartition> assignedPartitions = FluentIterable
      .from(partitionsInfos)                                                             
      .filter(Predicates.notNull())
      .transform(new Function<PartitionInfo, TopicPartition>() {                                                           
          @Override
          public TopicPartition apply(final PartitionInfo input) {                                                                         
            return new TopicPartition(topic, input.partition());
          }
      }).toList();

switch (listenMode) {
case OLDEST:
  kafkaConsumer.seekToBeginning(assignedPartitions);
  break;
case LATEST:
  kafkaConsumer.seekToEnd(assignedPartitions);
  break;
default:
  break;
}
Run Code Online (Sandbox Code Playgroud)

此代码未按预期工作。它永远挂在seekToBeginningseekToEnd电话。

我错过了什么吗?

Bal*_*oja 5

在您可以之前,seek()您首先需要将subscribe()一个主题或一个主题的assign()分区提供给消费者。请注意,subscribe()assign()懒惰的呼叫因此,你也需要做一个“伪呼叫”来poll()之前,你可以使用寻找()或seekToBeginning()seekToEnd()

  • 对 poll() 的虚拟调用是什么样的?是调用轮询零微秒吗?为什么需要虚拟调用? (2认同)