Sha*_*P S 2 java apache-kafka spring-kafka
这个问题适用于Spring Kafka,与Apache Kafka和High Level Consumer相关:跳过损坏的消息
有没有办法配置Spring Kafka使用者跳过无法读取/处理(损坏)的记录?
我看到如果无法反序列化,消费者会陷入同一记录的情况.这是消费者抛出的错误.
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Can not construct instance of java.time.LocalDate: no long/Long-argument constructor/factory method to deserialize from Number value
Run Code Online (Sandbox Code Playgroud)
消费者轮询主题并在循环中继续打印相同的错误,直到程序被杀死.
在具有以下使用者工厂配置的@KafkaListener中,
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
Run Code Online (Sandbox Code Playgroud)
您需要ErrorHandlingDeserializer
:https://docs.spring.io/spring-kafka/docs/2.2.0.RELEASE/reference/html/_reference.html#error-handling-deserializer
如果您无法迁移到该2.2
版本,请考虑实现自己的版本并返回null
那些无法正确反序列化的记录.
源代码在这里:https://github.com/spring-projects/spring-kafka/blob/master/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/ErrorHandlingDeserializer2.java