spring Kafka模型不在可信包中

app*_*app 2 java spring apache-kafka spring-boot spring-kafka

spring-Kafka-2.1.5我正在与和 一起研究微服务spring-boot-2.0.5

第一个服务将向卡夫卡产生一些消息,第二个服务将消耗它们,在消耗时我遇到了问题

Caused by: java.lang.IllegalArgumentException: The class 'com.service1.model.TopicMessage' is not in the trusted packages: [java.util, java.lang, com.service2.model.ConsumeMessage]. 
If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
Run Code Online (Sandbox Code Playgroud)

所以从错误消息来看,这是com.service1.model.TopicMessageservice1的序列化模型。但我正在尝试将消息反序列化为com.service2.model.ConsumeMessageservice2 中的模型并遇到此问题

我在这里发现了同样的问题,并尝试了以下格式以及文档文档

下面是我的配置

  @Bean(name = "kafkaConsumerConfig")
   public Map<String, Object> kafkaConsumerConfig() {

    Map<String, Object> props = new HashMap<>();

    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetconfig);
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);

     }
Run Code Online (Sandbox Code Playgroud)

kafka消费者工厂

@Bean(name = "kafkaConsumerFactory")
public ConsumerFactory<String, ConsumeMessage> kafkaConsumerFactory()        {

    JsonDeserializer<ConsumeMessage> 
    deserializer = new JsonDeserializer<>();
    deserializer.addTrustedPackages("com.service2.model");
return new DefaultKafkaConsumerFactory<String, ConsumeMessage>(kafkaConsumerConfig(),new StringDeserializer(),deserializer);

}
Run Code Online (Sandbox Code Playgroud)

kafkaListenerContainerFactory

 @Bean(name = "kafkaListenerContainerFactory")
 public ConcurrentKafkaListenerContainerFactory<String, ConsumeMessage > kafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, ConsumeMessage > factory = new ConcurrentKafkaListenerContainerFactory<>();

    factory.setConcurrency(Integer.parseInt(threads));
    factory.setBatchListener(true);
    factory.setConsumerFactory(kafkaConsumerFactory());
    factory.getContainerProperties().setPollTimeout(Long.parseLong(pollTimeout));
    factory.getContainerProperties().setAckMode(AckMode.BATCH);

    return factory;
}
Run Code Online (Sandbox Code Playgroud)

Art*_*lan 6

您需要header precedence在解串器中禁用:

/**
 * Construct an instance with the provided target type, and
 * useHeadersIfPresent with a default {@link ObjectMapper}.
 * @param targetType the target type.
 * @param useHeadersIfPresent true to use headers if present and fall back to target
 * type if not.
 * @since 2.2
 */
public JsonDeserializer(Class<? super T> targetType, boolean useHeadersIfPresent) {
Run Code Online (Sandbox Code Playgroud)

useHeadersIfPresent参数必须配置为false。这样,inferred将使用类型并且标头值将被忽略。

如果您不使用,您应该考虑使用类似的逻辑spring-kafka-2.2实现您自己的: https://github.com/spring-projects/spring-kafka/blob/master/spring-kafka/src/main/java/org/ springframework/kafka/support/serializer/JsonDeserializer.javaJsonDeserializer