过滤kafka消息的最佳方式应该是什么

Tec*_*eek 5 regex apache-kafka kafka-consumer-api spring-kafka

我正在使用来自卡夫卡主题的数据,其中包括区号。我必须仅过滤某些区号的数据。任何人都可以建议解决这个问题的最佳方法吗?

这是我的监听器代码的样子。最佳实践是将数据解析为对象(因为我将有效负载映射到 TEST 对象)并根据我需要过滤的值过滤数据,或者 kafka 是否提供了我可以使用此过滤过程的任何其他库。

Kafka监听器方法

@Service
public class Listener{

    @KafkaListener(topics = "#{@topicName}")
        public void listen(String payload) throws IOException {

            LOGGER.info("received payload from topic='{}'", payload);
            ObjectMapper objectMapper = new ObjectMapper();
            objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

            TEST test = objectMapper.readValue(payload,TEST.class);

        }
}
Run Code Online (Sandbox Code Playgroud)

我的卡夫卡配置类:

@Configuration
public class Config {


    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> properties = new HashMap<>();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, applicationConfiguration.getKafkaBootStrap());
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, applicationConfiguration.getKafkaKeyDeserializer());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, applicationConfiguration.getKafkaValueDeserializer());
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, applicationConfiguration.getKafkaGroupId());
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, applicationConfiguration.getKafkaAutoOffsetReset());
        return properties;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        return factory;
    }
    @Bean
    public Listener receiver() {
        return new Listener();
    }

}
Run Code Online (Sandbox Code Playgroud)

Gar*_*ell 6

请参阅过滤消息

Spring for Apache Kafka 项目还通过 FilteringMessageListenerAdapter 类提供了一些帮助,该类可以包装您的 MessageListener。此类采用 RecordFilterStrategy 的实现,您可以在其中实现过滤器方法来指示消息是重复的并且应被丢弃。它有一个名为 ackDiscarded 的附加属性,它指示适配器是否应确认丢弃的记录。默认情况下为 false。

当您使用 时@KafkaListener,请RecordFilterStrategy在容器工厂上设置(以及可选的 ackDiscarded),以便侦听器包装在适当的过滤适配器中。

/**
 * Set the record filter strategy.
 * @param recordFilterStrategy the strategy.
 */
public void setRecordFilterStrategy(RecordFilterStrategy<? super K, ? super V> recordFilterStrategy) {
    this.recordFilterStrategy = recordFilterStrategy;
}
Run Code Online (Sandbox Code Playgroud)
/**
 * Set the record filter strategy.
 * @param recordFilterStrategy the strategy.
 */
public void setRecordFilterStrategy(RecordFilterStrategy<? super K, ? super V> recordFilterStrategy) {
    this.recordFilterStrategy = recordFilterStrategy;
}
Run Code Online (Sandbox Code Playgroud)

  • 我预计不会有任何显着差异;它只是一种允许关注点分离的机制(过滤与业务逻辑)。 (2认同)