我的环境:
kafka_2.10-0.10.0.0kafka-clients-0.10.0.0我的配置:
event_notification201event_cg01false按照要求,我的应用程序,基于标记的启动过程中,我必须设置偏移要么开始或结束。为此,我使用以下代码:
final List<PartitionInfo> partitionsInfos = kafkaConsumer.partitionsFor(this.topic);
final List<TopicPartition> assignedPartitions = FluentIterable
.from(partitionsInfos)
.filter(Predicates.notNull())
.transform(new Function<PartitionInfo, TopicPartition>() {
@Override
public TopicPartition apply(final PartitionInfo input) {
return new TopicPartition(topic, input.partition());
}
}).toList();
switch (listenMode) {
case OLDEST:
kafkaConsumer.seekToBeginning(assignedPartitions);
break;
case LATEST:
kafkaConsumer.seekToEnd(assignedPartitions);
break;
default:
break;
}
Run Code Online (Sandbox Code Playgroud)
此代码未按预期工作。它永远挂在seekToBeginning和seekToEnd电话。
我错过了什么吗?
在您可以之前,seek()您首先需要将subscribe()一个主题或一个主题的assign()分区提供给消费者。请注意,subscribe()和assign()懒惰的呼叫因此,你也需要做一个“伪呼叫”来poll()之前,你可以使用寻找()或seekToBeginning()或seekToEnd()。