Tec*_*eek 5 regex apache-kafka kafka-consumer-api spring-kafka
我正在使用来自卡夫卡主题的数据,其中包括区号。我必须仅过滤某些区号的数据。任何人都可以建议解决这个问题的最佳方法吗?
这是我的监听器代码的样子。最佳实践是将数据解析为对象(因为我将有效负载映射到 TEST 对象)并根据我需要过滤的值过滤数据,或者 kafka 是否提供了我可以使用此过滤过程的任何其他库。
Kafka监听器方法
@Service
public class Listener{
@KafkaListener(topics = "#{@topicName}")
public void listen(String payload) throws IOException {
LOGGER.info("received payload from topic='{}'", payload);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
TEST test = objectMapper.readValue(payload,TEST.class);
}
}
Run Code Online (Sandbox Code Playgroud)
我的卡夫卡配置类:
@Configuration
public class Config {
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, applicationConfiguration.getKafkaBootStrap());
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, applicationConfiguration.getKafkaKeyDeserializer());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, applicationConfiguration.getKafkaValueDeserializer());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, applicationConfiguration.getKafkaGroupId());
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, applicationConfiguration.getKafkaAutoOffsetReset());
return properties;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public Listener receiver() {
return new Listener();
}
}
Run Code Online (Sandbox Code Playgroud)
请参阅过滤消息。
Spring for Apache Kafka 项目还通过 FilteringMessageListenerAdapter 类提供了一些帮助,该类可以包装您的 MessageListener。此类采用 RecordFilterStrategy 的实现,您可以在其中实现过滤器方法来指示消息是重复的并且应被丢弃。它有一个名为 ackDiscarded 的附加属性,它指示适配器是否应确认丢弃的记录。默认情况下为 false。
当您使用 时
@KafkaListener,请RecordFilterStrategy在容器工厂上设置(以及可选的 ackDiscarded),以便侦听器包装在适当的过滤适配器中。
/**
* Set the record filter strategy.
* @param recordFilterStrategy the strategy.
*/
public void setRecordFilterStrategy(RecordFilterStrategy<? super K, ? super V> recordFilterStrategy) {
this.recordFilterStrategy = recordFilterStrategy;
}
Run Code Online (Sandbox Code Playgroud)
/**
* Set the record filter strategy.
* @param recordFilterStrategy the strategy.
*/
public void setRecordFilterStrategy(RecordFilterStrategy<? super K, ? super V> recordFilterStrategy) {
this.recordFilterStrategy = recordFilterStrategy;
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
14359 次 |
| 最近记录: |