Usr*_*Usr 8 java apache-kafka json-deserialization spring-boot
我知道这个问题很常见,但在遵循不同的解决方案后,我找不到任何可行的解决方案。我想在 Kafka 中接收消息时反序列化字符串以及我的自定义类对象。使用 String 一切都很好,但不适用于我的班级。我在消费者配置中添加了受信任的包(com.springmiddleware.entities作为我的类所在的包):
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "foo");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.springmiddleware.entities");
return props;
}
Run Code Online (Sandbox Code Playgroud)
我的application.yml文件中有这个:
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: foo
auto-offset-reset: earliest
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring:
json:
trusted:
packages: 'com.springmiddleware.entities'
Run Code Online (Sandbox Code Playgroud)
并将这些行添加到 application.properties
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=com.springmiddleware.entities
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.add.type.headers=false
Run Code Online (Sandbox Code Playgroud)
但以下错误一直显示:
org.apache.kafka.common.errors.SerializationException:在偏移量 1 处反序列化分区 topic2-0 的键/值时出错。如果需要,请寻找记录以继续消费。引起:java.lang.IllegalArgumentException:类“com.springmiddleware.entities.Crime”不在受信任的包中:[java.util, java.lang]。如果您认为这个类可以安全地反序列化,请提供它的名称。如果序列化仅由受信任的来源完成,您还可以启用全部信任 (*)。
更新
接收器配置:
@EnableKafka
@Configuration
public class ReceiverConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "foo");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.springmiddleware.entities");
props.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, "false");
return props;
}
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
new JsonDeserializer<>());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
Run Code Online (Sandbox Code Playgroud)
更新 2
Listener Class (Receiver):
@KafkaListener(topics = "${app.topic.foo}")
@Service
public class Receiver {
private CountDownLatch latch = new CountDownLatch(1);
public CountDownLatch getLatch() {
return latch;
}
@KafkaHandler
public void listen(@Payload Crime message) {
System.out.println("Received " + message);
}
@KafkaHandler
public void listen(@Payload String message) {
System.out.println("Received " + message);
}
Run Code Online (Sandbox Code Playgroud)
只需使用重载JsonDeserializer构造函数
从版本 2.2 开始,您可以通过使用具有布尔值 useHeadersIfPresent(默认情况下为 true)的重载构造函数之一,显式配置反序列化器以使用提供的目标类型并忽略标头中的类型信息。
以下示例显示了如何执行此操作:
DefaultKafkaConsumerFactory<Integer, Cat1> cf = new DefaultKafkaConsumerFactory<>(props,
new IntegerDeserializer(), new JsonDeserializer<>(Cat1.class, false));
Run Code Online (Sandbox Code Playgroud)
你的代码:
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
new JsonDeserializer<>(Object.class,false));
}
Run Code Online (Sandbox Code Playgroud)
现在@KafkaListener在班级使用
@KafkaListener(topics = "myTopic")
@Service
public class MultiListenerBean {
@KafkaHandler
public void listen(Cat cat) {
...
}
@KafkaHandler
public void listen(Hat hat) {
...
}
@KafkaHandler(isDefault = true)
public void delete(Object obj) {
...
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
9352 次 |
| 最近记录: |