Adi*_*ari 4 java apache-kafka microservices kafka-consumer-api
需要从 Kafka 主题的特定偏移量获取消息
导致 IllegalStateException 异常卡在assign()
如果我不使用assign(),那么消费者不会执行搜索,因为这是一个惰性操作
实际目的:需要从预先确定的偏移量到结束迭代主题中的消息。该预先确定的偏移量计算为markOffset()
static void fetchMessagesFromMarkedOffset() {
Consumer<Long, String> consumer = ConsumerCreator.createConsumer();
consumer.assign(set); // <---- Exception at this place
map.forEach((k,v) -> {
consumer.seek(k, v-3);
});
ConsumerRecords<Long, String> consumerRecords = consumer.poll(100);
consumerRecords.forEach(record -> {
System.out.println("Record Key " + record.key());
System.out.println("Record value " + record.value());
System.out.println("Record partition " + record.partition());
System.out.println("Record offset " + record.offset());
});
consumer.close();
}
Run Code Online (Sandbox Code Playgroud)
涉及的其余相关代码
public static Set<TopicPartition> set;
public static Map<TopicPartition, Long> map;
static void markOffset() {
Consumer<Long, String> consumer = ConsumerCreator.createConsumer();
consumer.poll(100);
set = consumer.assignment();
map = consumer.endOffsets(set);
System.out.println("Topic Partitions: " + set);
System.out.println("End Offsets: " + map);
}
Run Code Online (Sandbox Code Playgroud)
消费者创造
private Consumer createConsumer(String topicName) {
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "capacity-service-application");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
final Consumer consumer = new KafkaConsumer(props);
consumer.subscribe(Collections.singletonList(topicName));
return consumer;
}
Run Code Online (Sandbox Code Playgroud)
例外
Exception in thread "main" java.lang.IllegalStateException: Subscription to topics, partitions and pattern are mutually exclusive
at org.apache.kafka.clients.consumer.internals.SubscriptionState.setSubscriptionType(SubscriptionState.java:104)
at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignFromUser(SubscriptionState.java:157)
at org.apache.kafka.clients.consumer.KafkaConsumer.assign(KafkaConsumer.java:1064)
at com.gaurav.kafka.App.fetchMessagesFromMarkedOffset(App.java:44)
at com.gaurav.kafka.App.main(App.java:30)
Run Code Online (Sandbox Code Playgroud)
war*_*iak 10
不能混合分配manual和automatic分区分配。您应该使用KafkaConsumer::subscribe或,KafkaConsumer::assign但不能同时使用两者。
如果在致电后KafkaConsumer::subscribe您想切换到manual方法,您应该首先致电KafkaConsumer::unsubscribe。
根据https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html
请注意,不可能将手动分区分配(即使用分配)与通过主题订阅的动态分区分配(即使用订阅)混合在一起。
| 归档时间: |
|
| 查看次数: |
6248 次 |
| 最近记录: |