Ali*_*der 4 java spring apache-kafka kafka-consumer-api spring-kafka
我使用Spring Kafka API实现Kafka使用者手动偏移管理:
@KafkaListener(topics = "some_topic")
public void onMessage(@Payload Message message, Acknowledgment acknowledgment) {
if (someCondition) {
acknowledgment.acknowledge();
}
}
Run Code Online (Sandbox Code Playgroud)
在这里,我希望消费者只有在someCondition持有时才提交偏移量.否则,消费者应该睡一段时间并再次阅读相同的消息.
卡夫卡配置:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfig());
factory.getContainerProperties().setAckMode(MANUAL);
return factory;
}
private Map<String, Object> consumerConfig() {
Map<String, Object> props = new HashMap<>();
...
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
...
return props;
}
Run Code Online (Sandbox Code Playgroud)
使用当前配置,如果someCondition == false,consumer不提交偏移量,但仍然读取下一条消息.如果acknowledgement没有执行Kafka,有没有办法让消费者重读一条消息?
正如@Gary已经指出的那样,你是在正确的方向,seek()是这样做的方式.当我遇到这个问题时,我今天找不到它的代码示例.这是任何想要解决问题的人的代码.
public class Receiver implements AcknowledgingMessageListener<Integer, String>, ConsumerSeekAware {
private ConsumerSeekCallback consumerSeekCallback;
@Override
public void onMessage(ConsumerRecord<Integer, String> record, Acknowledgment acknowledgment) {
if (/*some condition*/) {
//process
acknowledgment.acknowledge(); //send ack
} else {
consumerSeekCallback.seek("your.topic", record.partition(), record.offset());
}
}
@Override
public void registerSeekCallback(ConsumerSeekCallback consumerSeekCallback) {
this.consumerSeekCallback = consumerSeekCallback;
}
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) {
// nothing is needed here for this program
}
@Override
public void onIdleContainer(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) {
// nothing is needed here for this program
}
}
Run Code Online (Sandbox Code Playgroud)