小智 7
添加到@Deadpool 评论。它会正常工作,但不会提交偏移量。所以我们会再次收到相同的消息,但它不会消耗。我们需要在设置factory.setAckDiscarded(true);之前进行设置,factory.setRecordFilterStrategy()以便它会丢弃并提交偏移量。
是的,在春季Kafka中,您可以在消费者消费之前过滤消息,接口中有一个接口public interface RecordFilterStrategy<K,V>和方法boolean filter(org.apache.kafka.clients.consumer.ConsumerRecord<K,V> consumerRecord)
因此,您需要重写此filter方法,如果返回false,则使用者将消耗消息,如果返回true,则将不使用消息。
您可以将此过滤应用于消息以及消息值
consumerRecord.key() // will return key of message
consumerRecord.value() // will return the message
Run Code Online (Sandbox Code Playgroud)
示例代码:
@Bean(name = "kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConcurrency(Integer.parseInt(threads));
factory.setBatchListener(true);
factory.setConsumerFactory(kafkaConsumerFactory());
factory.getContainerProperties().setPollTimeout(Long.parseLong(pollTimeout));
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.BATCH);
if(true) {
factory.setRecordFilterStrategy(new RecordFilterStrategy<String, String>() {
@Override
public boolean filter(ConsumerRecord<String, String> consumerRecord) {
if(consumerRecord.key().equals("ETEST")) {
return false;
}
else {
return true;
}
}
});
}
return factory;
}
Run Code Online (Sandbox Code Playgroud)