KafkaConsumer 0.10 Java API错误消息:没有分区的当前分配

col*_*sal 18 java kafka-consumer-api

我正在使用KafkaConsumer 0.10 Java api.我想从特定的分区和特定的偏移量消耗.我查了一下,发现有一个搜索方法,但它抛出异常.任何人都有类似的用例或解决方案?

码:

KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(consumerProps);
consumer.seek(new TopicPartition("mytopic", 1), 4);
Run Code Online (Sandbox Code Playgroud)

例外

java.lang.IllegalStateException: No current assignment for partition mytopic-1
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251)
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:276)
    at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1135)
    at xx.xxx.xxx.Test.main(Test.java:182)
Run Code Online (Sandbox Code Playgroud)

Mat*_*Sax 40

在您开始之前,seek()您首先需要向消费者subscribe()提供主题 主题assign()分区.也请记住,这subscribe()assign()懒惰-这样,你也需要做一个"虚拟来电",以poll()才可以使用seek().

注意:从Kafka 2.0开始,新的poll(Duration timeout)是异步的,并且不能保证在poll返回时有完整的赋值.因此,您可能需要在使用前检查您的作业,seek()poll再次刷新作业.(参见KIP-266了解详情)

如果您使用subscribe(),则使用组管理:因此,您可以使用相同的方式启动多个使用者group.id,并且主题的所有分区将自动分配到组内的所有使用者(每个分区将分配给组中的单个使用者) .

如果要读取特定分区,则需要使用手动分配assign().这允许您进行任何所需的任务.

顺便说一句:KafkaConsumer有一个很长的详细类JavaDoc,包括例子.值得一读.

  • 我想你的意思是 `group.id` 而不是 `application.id` (2认同)
  • 我猜对于assign()没关系.但是对于`subscribe()`,只要你不调用`poll()`你没有加入消费者组,你没有分配任何分区.因此,您将最终得到一个例外,如问题所示. (2认同)