如何在春季Kafka消费消费者之前过滤Kafka消息

app*_*app 3 java spring-kafka

我在项目中使用的是Spring Kafka,我想在消费者根据键和值消费之前过滤消息。

可能吗?

小智 7

添加到@Deadpool 评论。它会正常工作,但不会提交偏移量。所以我们会再次收到相同的消息,但它不会消耗。我们需要在设置factory.setAckDiscarded(true);之前进行设置,factory.setRecordFilterStrategy()以便它会丢弃并提交偏移量。


Dea*_*ool 6

是的,在春季Kafka中,您可以在消费者消费之前过滤消息,接口中有一个接口public interface RecordFilterStrategy<K,V>和方法boolean filter(org.apache.kafka.clients.consumer.ConsumerRecord<K,V> consumerRecord)

因此,您需要重写此filter方法,如果返回false,则使用者将消耗消息,如果返回true,则将不使用消息。

您可以将此过滤应用于消息以及消息值

consumerRecord.key() // will return key of message
consumerRecord.value() // will return the message
Run Code Online (Sandbox Code Playgroud)

示例代码:

 @Bean(name = "kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConcurrency(Integer.parseInt(threads));
    factory.setBatchListener(true);
    factory.setConsumerFactory(kafkaConsumerFactory());
    factory.getContainerProperties().setPollTimeout(Long.parseLong(pollTimeout));
    factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.BATCH);

    if(true) {
        factory.setRecordFilterStrategy(new RecordFilterStrategy<String, String>() {

            @Override
            public boolean filter(ConsumerRecord<String, String> consumerRecord) {
                if(consumerRecord.key().equals("ETEST")) {
                return false;
                }
            else {
                return true;
                 }
            }   
        });
    }

    return factory;
}
Run Code Online (Sandbox Code Playgroud)

  • 设置 if (true) {...} 的目的是什么 (4认同)