Spring Kafka该类不在受信任的软件包中

ale*_*oid 7 java apache-kafka spring-boot spring-kafka

在库更新之前的我的Spring Boot / Kafka应用程序中,我使用以下类org.telegram.telegrambots.api.objects.Update将消息发布到Kafka主题。现在,我使用以下内容org.telegram.telegrambots.meta.api.objects.Update。如您所见-它们具有不同的软件包。

重新启动应用程序后,我遇到了以下问题:

[org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] o.s.kafka.listener.LoggingErrorHandler : Error while processing: null

org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition telegram.fenix.bot.update-0 at offset 4223. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalArgumentException: The class 'org.telegram.telegrambots.api.objects.Update' is not in the trusted packages: [java.util, java.lang, org.telegram.telegrambots.meta.api.objects]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:139) ~[spring-kafka-2.1.8.RELEASE.jar!/:2.1.8.RELEASE]
at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:113) ~[spring-kafka-2.1.8.RELEASE.jar!/:2.1.8.RELEASE]
at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:221) ~[spring-kafka-2.1.8.RELEASE.jar!/:2.1.8.RELEASE]
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:967) ~[kafka-clients-1.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.access$3300(Fetcher.java:93) ~[kafka-clients-1.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1144) ~[kafka-clients-1.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1400(Fetcher.java:993) ~[kafka-clients-1.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:527) ~[kafka-clients-1.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:488) ~[kafka-clients-1.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1155) ~[kafka-clients-1.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115) ~[kafka-clients-1.1.0.jar!/:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699) ~[spring-kafka-2.1.8.RELEASE.jar!/:2.1.8.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_171]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_171]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_171]
Run Code Online (Sandbox Code Playgroud)

这是我的配置:

@EnableAsync
@Configuration
public class ApplicationConfig {

    @Bean
    public StringJsonMessageConverter jsonConverter() {
        return new StringJsonMessageConverter();
    }

}

@Configuration
public class KafkaProducerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> producerConfigs() {

        Map<String, Object> props = new HashMap<>();

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 15000000);

        return props;
    }

    @Bean
    public ProducerFactory<String, Update> updateProducerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, Update> updateKafkaTemplate() {
        return new KafkaTemplate<>(updateProducerFactory());
    }

}

@Configuration
public class KafkaConsumerConfig {

    @Value("${kafka.consumer.max.poll.interval.ms}")
    private String kafkaConsumerMaxPollIntervalMs;

    @Value("${kafka.consumer.max.poll.records}")
    private String kafkaConsumerMaxPollRecords;

    @Value("${kafka.topic.telegram.fenix.bot.update.consumer.concurrency}")
    private Integer updateConsumerConcurrency;

    @Bean
    public ConsumerFactory<String, String> consumerFactory(KafkaProperties kafkaProperties) {
        return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new JsonDeserializer<>(String.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(KafkaProperties kafkaProperties) {

        kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaConsumerMaxPollIntervalMs);
        kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaConsumerMaxPollRecords);

        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
        factory.setConsumerFactory(consumerFactory(kafkaProperties));

        return factory;
    }

    @Bean
    public ConsumerFactory<String, Update> updateConsumerFactory(KafkaProperties kafkaProperties) {
        return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new JsonDeserializer<>(Update.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Update> updateKafkaListenerContainerFactory(KafkaProperties kafkaProperties) {

        kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaConsumerMaxPollIntervalMs);
        kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaConsumerMaxPollRecords);

        ConcurrentKafkaListenerContainerFactory<String, Update> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
        factory.setConsumerFactory(updateConsumerFactory(kafkaProperties));
        factory.setConcurrency(updateConsumerConcurrency);

        return factory;
    }

}
Run Code Online (Sandbox Code Playgroud)

application.properties

spring.kafka.bootstrap-servers=${kafka.host}:${kafka.port}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.group-id=postfenix
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
Run Code Online (Sandbox Code Playgroud)

如何解决此问题并使Kafka将旧消息反序列化为新消息?

更新

这是我的听众

@Component
public class UpdateConsumer {

    @KafkaListener(topics = "${kafka.topic.update}", containerFactory = "updateKafkaListenerContainerFactory")
    public void onUpdateReceived(ConsumerRecord<String, Update> consumerRecord, Acknowledgment ack) {

        //do some logic here

        ack.acknowledge();
    }

}
Run Code Online (Sandbox Code Playgroud)

Syl*_*are 13

对于这个,有两种方法可以做到,要么在您的解串器中,要么在您的 application.yml 中。

无论是在您的解串器中

在你的解串器中,你在你的DefaultKafkaConsumerFactory(创建你的消费者工厂)中使用。比方说,你想使一个ConsumerFactory<String, Foo> Foo作为模型/ POJO你想使用你的卡夫卡的消息。

你需要addTrustedPackagesJsonDeserializer我在 Kotlin 中有一个例子,但它在 Java 中的语法几乎相同:

 val deserializer = JsonDeserializer<Foo>()
 deserializer.addTrustedPackages("com.example.entity.Foo") // Adding Foo to our trusted packages

 val consumerFactory = DefaultKafkaConsumerFactory(
      consumerConfigs(),  // your consumer config
      StringDeserializer(), 
      deserializer // Using our newly created deserializer
 )
Run Code Online (Sandbox Code Playgroud)

或者在你的 application.yml 中

在您的 application.yml 文件中遵循spring-kafka说明。我们使用com.example.entity.Foo包在受信任的商店中添加 Foo 类:

spring:
  kafka:
    consumer:
      properties:
        spring.json.trusted.packages: "com.example.entity.Foo"
Run Code Online (Sandbox Code Playgroud)

spring.json.trusted.packages接受封装的阵列。您可以指定一个类包,或*用于任何包。在这种情况下,你并不需要你通过deserializerDefaultKafkaConsumerFactory()仅在消费者配置。

  • 第一种方法在我的测试中帮助了我,其中消费者是手动创建的。谢谢。 (2认同)

小智 12

有两个关键点需要提及。

  1. 生产者和消费者有两个独立的项目。
  2. 然后发送消息(值)是一个对象类型而不是原始类型。

问题是生产消息对象在消费者端不可用,因为它们是两个独立的项目。

两个克服这个问题请按照下面提到的步骤在 Spring boot Producer 和 Consumer 应用程序中。

----生产者应用程序 -------------

** 生产者配置类 **

import com.kafka.producer.models.Container;    
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaProducerConfig {

@Bean
public ProducerFactory<String, Container> producerFactory(){

    Map<String, Object> config = new HashMap<>();

config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

    return new DefaultKafkaProducerFactory(config);
}

@Bean
public KafkaTemplate<String, Container> kafkaTemplate(){
    return new KafkaTemplate<>(producerFactory());
}
}
Run Code Online (Sandbox Code Playgroud)

注意:容器是要在 kafka 主题中发布的自定义对象。


** 生产者类 **

import com.kafka.producer.models.Container;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

@Service
public class Producer {

private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);
private static final String TOPIC = "final-topic";

@Autowired
private KafkaTemplate<String, Container> kafkaTemplate;

public void sendUserMessage(Container msg) {
    LOGGER.info(String.format("\n ===== Producing message in JSON ===== \n"+msg));
    Message<Container> message = MessageBuilder
            .withPayload(msg)
            .setHeader(KafkaHeaders.TOPIC, TOPIC)
            .build();
    this.kafkaTemplate.send(message);
}
}
Run Code Online (Sandbox Code Playgroud)

** 生产者控制器 **

import com.kafka.producer.models.Container;
import com.kafka.producer.services.Producer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

@RestController
@RequestMapping("/message")
public class MessageController {

@Autowired
private Producer producer;

@PostMapping(value = "/publish")
public String sendMessageToKafkaTopic(@RequestBody Container containerMsg) {
    this.producer.sendUserMessage(containerMsg);
    return "Successfully Published !!";
}
}
Run Code Online (Sandbox Code Playgroud)

注意:类型为 Container 的消息将作为 JSON 消息发布到 kafka 主题名称 :final-topic。

================================================== ==============================

-- 消费者应用程序 --

** 配置类 **

import com.kafka.consumer.models.Container;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableKafka
public class KafkaConsumerOneConfig {

@Bean
public ConsumerFactory<String, Container> consumerFactory(){
    JsonDeserializer<Container> deserializer = new JsonDeserializer<>(Container.class);
    deserializer.setRemoveTypeHeaders(false);
    deserializer.addTrustedPackages("*");
    deserializer.setUseTypeMapperForKey(true);

    Map<String, Object> config = new HashMap<>();

    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_one");
    config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer);

    return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), deserializer);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Container> kafkaListenerContainerFactory(){
    ConcurrentKafkaListenerContainerFactory<String, Container> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;

}
}
Run Code Online (Sandbox Code Playgroud)

注意:在这里您可以看到,我们必须使用自定义 JsonDeserializer 来使用来自 final-topic(topic name) 的容器对象类型 Json 消息,而不是使用默认的 JsonDeserializer()。


** 消费者服务 **

import com.kafka.consumer.models.Container;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
import java.io.IOException;

@Service
public class ConsumerOne {

private final Logger LOGGER = LoggerFactory.getLogger(ConsumerOne.class);

@KafkaListener(topics = "final-topic", groupId = "group_one", containerFactory = "kafkaListenerContainerFactory")
public void consumeUserMessage(@Payload Container msg, @Headers MessageHeaders headers) throws IOException {
    System.out.println("received data in Consumer One ="+ msg.getMessageTypes());
}
}
Run Code Online (Sandbox Code Playgroud)

  • 错误“配置 value.deserializer 的值 org.springframework.kafka.support.serializer.JsonDeserializer@279489ac 无效:需要类实例或类名。”到达此处时:“config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer);”看来我必须使用工厂而不是实例。 (2认同)

jum*_*key 12

jsonDeserializer.addTrustedPackages("*");
Run Code Online (Sandbox Code Playgroud)

解决了我的 spring-kafka-2.2.8 问题。


要将其添加到application.properties

spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
Run Code Online (Sandbox Code Playgroud)

重要的提示:

如果您分别为 KafkaConsumer 和 KafkaProducer 提供了 Serializer 和 Deserializer 实例,则它们不起作用。

参考文献:
[1] https://docs.spring.io/spring-kafka/reference/html/#json-serde
[2] https://github.com/spring-projects/spring-kafka/issues/535


mar*_*ste 6

我也遇到过这个问题,但是上面的解决方案对我不起作用。但是,诀窍是按如下方式配置 kafka 消费者工厂:

props.put(JsonDeserializer.TRUSTED_PACKAGES, "your.package.name");
Run Code Online (Sandbox Code Playgroud)


mi_*_*_mo 6

当我想在消费者应用程序中使用消息时,我遇到了类似的问题,我收到了 2 个错误:

1-The class 'someClass' is not in the trusted packages: [java.util, java.lang,If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*)

2-org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition partion at offset 9902. If needed, please seek past the record to continue consumption.

我可以通过将此属性(JsonDeserializer.TRUSTED_PACKAGE)添加到 KafkaConfig 类的 kafka 消费者配置生成器方法(makeConfig)中来解决这个问题,以便使用这种方法使用配置我的问题已经解决:

private Map<String, Object> makeConfig(ServiceMessagePriority input)
{
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    configProps.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, ErrorHandlingDeserializer.class);
    configProps.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, ErrorHandlingDeserializer.class);
    configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    configProps.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.core.model.ServiceMsgDTO");
    configProps.put(JsonDeserializer.USE_TYPE_INFO_HEADERS,false);
    configProps.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
return configProps;
}
Run Code Online (Sandbox Code Playgroud)


Gar*_*ell 5

请参阅文档

从版本2.1开始,类型信息可以在记录标题中传递,从而允许处理多种类型。另外,可以使用Kafka属性配置序列化器/解串器。

JsonSerializer.ADD_TYPE_INFO_HEADERS(默认为true); 设置为false可禁用JsonSerializer的此功能(设置addTypeInfo属性)。

JsonDeserializer.KEY_DEFAULT_TYPE; 如果不存在标题信息,则用于密钥反序列化的后备类型。

JsonDeserializer.VALUE_DEFAULT_TYPE; 如果没有标题信息,则为值的反序列化的后备类型。

JsonDeserializer.TRUSTED_PACKAGES(默认java.util,java.lang); 逗号分隔的允许反序列化的软件包模式列表;*表示全部反序列化。

默认情况下,序列化程序会将类型信息添加到标头中。

请参阅引导文档

同样,您可以禁用JsonSerializer在标题中发送类型信息的默认行为:

spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer spring.kafka.producer.properties.spring.json.add.type.headers=false

或者,您可以将类型映射添加到入站消息转换器中,以将源类型映射到目标类型。

编辑

话虽如此,您使用的是哪个版本?

  • 我的意思是哪个版本的spring-kafka。您将需要执行我提出的建议之一 - 抑制来自发送方的类型标头或将映射添加到解串器中的类型映射器。类型标头优先于传递到反序列化器构造函数中的目标类型。`new JsonDeserializer&lt;&gt;(Update.class)`。我们可能应该向反序列化器添加一个布尔值,以允许使用提供的类型,即使标头存在。 (2认同)
  • 不; 您在生产者端需要`spring.kafka. Producer.properties.spring.json.add.type.headers = false` - 但您将需要在消费者端进行类型映射以读取任何已经具有标头的现有消息(除非您可以使用您的旧应用程序版本来使用它们)。请参阅反序列化器上的“setTypeMapper”和“DefaultJackson2JavaTypeMapper”上的“setIdClassMapping()”。您需要将源类名称映射到目标类。 (2认同)
  • 我打开了 [一个针对这个问题的问题](https://github.com/spring-projects/spring-kafka/issues/768)。 (2认同)