Spring Kafka 错误:此错误处理程序无法直接处理“SerializationException”;请考虑配置“ErrorHandlingDeserializer”

Mix*_*age 10 java apache-kafka spring-boot spring-kafka

生产者属性

spring.kafka.producer.bootstrap-servers=127.0.0.1:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
Run Code Online (Sandbox Code Playgroud)

消费属性

spring.kafka.consumer.bootstrap-servers=127.0.0.1:9092
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.consumer.group-id=user-group
server.port=8085
Run Code Online (Sandbox Code Playgroud)

消费者服务

@Service
public class UserConsumerService {

    @KafkaListener(topics = { "user-topic" })
    public void consumerUserData(User user) {
        System.out.println("Users Age Is: " + user.getAge() + " Fav Genre " + user.getFavGenre());
    }
}
Run Code Online (Sandbox Code Playgroud)

生产者服务

@Service
public class UserProducerService {

    @Autowired
    private KafkaTemplate<String, User> kafkaTemplate;

    public void sendUserData(User user) {
        kafkaTemplate.send("user-topic", user.getName(), user);
    }
}
Run Code Online (Sandbox Code Playgroud)

用于创建主题的生产者配置

    @Configuration public class KafkaConfig {
    
        @Bean
        public NewTopic topicOrder() {
            return TopicBuilder.name("user-topic").partitions(2).replicas(1).build();
        } 
}
Run Code Online (Sandbox Code Playgroud)

生产者工作得很好,但消费者给出了类似的错误

2021-12-06 21:45:50.299 ERROR 4936 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Consumer exception

java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 
'ErrorHandlingDeserializer' in the value and/or key deserializer
    at
org.springframework.kafka.listener.DefaultErrorHandler.handleOtherException(DefaultErrorHandler.java:149)
~[spring-kafka-2.8.0.jar:2.8.0] DefaultErrorHandler.java:149
    at
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1760)
~[spring-kafka-2.8.0.jar:2.8.0]
KafkaMessageListenerContainer.java:1760
    at
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1283)
~[spring-kafka-2.8.0.jar:2.8.0]
KafkaMessageListenerContainer.java:1283
    at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
~[na:na] Executors.java:539
    at
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
~[na:na] FutureTask.java:264
    at
java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
Thread.java:833 Caused by:
org.apache.kafka.common.errors.RecordDeserializationException: Error
deserializing key/value for partition user-topic-0 at offset 1. If
needed, please seek past the record to continue consumption.
    at
org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1429)
~[kafka-clients-3.0.0.jar:na] Fetcher.java:1429
    at
org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:134)
~[kafka-clients-3.0.0.jar:na] Fetcher.java:134
    at
org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1652)
~[kafka-clients-3.0.0.jar:na] Fetcher.java:1652
    at
org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1800(Fetcher.java:1488)
~[kafka-clients-3.0.0.jar:na] Fetcher.java:1488
    at
org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:721)
~[kafka-clients-3.0.0.jar:na] Fetcher.java:721
    at
org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:672)
~[kafka-clients-3.0.0.jar:na] Fetcher.java:672
    at
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1277)
~[kafka-clients-3.0.0.jar:na] KafkaConsumer.java:1277
    at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238)
~[kafka-clients-3.0.0.jar:na] KafkaConsumer.java:1238
    at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
~[kafka-clients-3.0.0.jar:na] KafkaConsumer.java:1211
    at
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1507)
~[spring-kafka-2.8.0.jar:2.8.0]
KafkaMessageListenerContainer.java:1507
    at
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1497)
~[spring-kafka-2.8.0.jar:2.8.0]
KafkaMessageListenerContainer.java:1497
    at
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1325)
~[spring-kafka-2.8.0.jar:2.8.0] KafkaMessage
Run Code Online (Sandbox Code Playgroud)

我是卡夫卡新手,试图找出为什么会出现此错误。我该如何解决?

Gar*_*ell 20

错误消息没有告诉您任何信息吗?

\n
\n

此错误处理程序无法直接处理“SerializationException”;请考虑在值和/或键反序列化器中配置“ErrorHandlingDeserializer”

\n
\n

请参阅文档

\n
\n

当反序列化器无法反序列化消息时,Spring 无法处理该问题,因为它发生在poll()返回之前。为了解决这个问题,ErrorHandlingDeserializer引入了。该解串器委托给真正的解串器(键或值)。如果委托无法反序列化记录内容,则会在包含原因和原始字节的标头中ErrorHandlingDeserializer返回一个null值和 a 。DeserializationException当您使用记录级 时MessageListener,如果ConsumerRecord包含DeserializationException键或值的标头,ErrorHandler则会使用失败的 来调用container\xe2\x80\x99s ConsumerRecord。记录不会传递给侦听器。

\n

[\xe2\x80\xa6]

\n

您可以使用DefaultKafkaConsumerFactory接受键和值Deserializer对象的构造函数,并连接到ErrorHandlingDeserializer您已使用适当委托配置的适当实例中。或者,您可以使用使用者配置属性(由ErrorHandlingDeserializer)来实例化委托。属性名称是ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASSErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS。属性值可以是类或类名。以下示例展示了如何设置这些属性:

\n
... // other props\nprops.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);\nprops.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);\nprops.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);\nprops.put(JsonDeserializer.KEY_DEFAULT_TYPE, "com.example.MyKey")\nprops.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());\nprops.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.example.MyValue")\nprops.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example")\nreturn new DefaultKafkaConsumerFactory<>(props);\n\n
Run Code Online (Sandbox Code Playgroud)\n
\n

带引导:

\n
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer\nspring.kafka.consumer.properties.spring.deserializer.value.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializer\n
Run Code Online (Sandbox Code Playgroud)\n