col*_*sal 18 java kafka-consumer-api
我正在使用KafkaConsumer 0.10 Java api.我想从特定的分区和特定的偏移量消耗.我查了一下,发现有一个搜索方法,但它抛出异常.任何人都有类似的用例或解决方案?
码:
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(consumerProps);
consumer.seek(new TopicPartition("mytopic", 1), 4);
Run Code Online (Sandbox Code Playgroud)
例外
java.lang.IllegalStateException: No current assignment for partition mytopic-1
at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251)
at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:276)
at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1135)
at xx.xxx.xxx.Test.main(Test.java:182)
Run Code Online (Sandbox Code Playgroud)
Mat*_*Sax 40
在您开始之前,seek()您首先需要向消费者subscribe()提供主题或 主题assign()分区.也请记住,这subscribe()和assign()懒惰-这样,你也需要做一个"虚拟来电",以poll()才可以使用seek().
注意:从Kafka 2.0开始,新的
poll(Duration timeout)是异步的,并且不能保证在poll返回时有完整的赋值.因此,您可能需要在使用前检查您的作业,seek()并poll再次刷新作业.(参见KIP-266了解详情)
如果您使用subscribe(),则使用组管理:因此,您可以使用相同的方式启动多个使用者group.id,并且主题的所有分区将自动分配到组内的所有使用者(每个分区将分配给组中的单个使用者) .
如果要读取特定分区,则需要使用手动分配assign().这允许您进行任何所需的任务.
顺便说一句:KafkaConsumer有一个很长的详细类JavaDoc,包括例子.值得一读.
| 归档时间: |
|
| 查看次数: |
15094 次 |
| 最近记录: |