相关疑难解决方法(0)

反序列化后如何处理SerializationException

我在Spring Kafka设置中使用Avro和Schema注册表.

我想以某种方式处理SerializationException,反序列化期间可能会抛出.

我找到了以下两个资源:

https://github.com/spring-projects/spring-kafka/issues/164

如何配置spring-kafka忽略错误格式的消息?

这些资源表明我返回null而不是SerializationException在反序列化和监听时抛出KafkaNull.这个解决方案很好用.

但是,我希望能够抛出异常而不是返回null.

KIP-161KIP-210为处理异常提供了更好的功能.我确实在Spring Cloud中找到了一些提到KIP-161的资源,但没有具体说明Spring-Kafka.

有谁知道如何捕获SerializationExceptionSpring Boot?

我使用的是Spring Boot 2.0.2

编辑:我找到了解决方案.

我宁愿抛出异常并抓住它而不是返回null或KafkaNull.我在多个不同的项目中使用我的自定义Avro序列化器和反序列化器,其中一些不是Spring.如果我更改了我的Avro序列化程序和反序列化程序,则需要更改其他一些项目以期望反序列化程序返回null.

我想关闭容器,这样我就不会丢失任何消息.在生产中永远不应该期望SerializationException.只有在Schema Registry关闭或者以某种方式将未格式化的消息发送到生产kafka时,才能发生SerializationException.无论哪种方式,SerializationException应该只发生很少,如果它发生,那么我想关闭容器,以便没有消息丢失,我可以调查问题.

只需考虑将从您的消费者容器中捕获所有异常.在我的具体情况下,我只想关闭它,如果它是一个SerializationException

public class SerializationExceptionHandler extends ContainerStoppingErrorHandler {

    @Override
    public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer,
                       MessageListenerContainer container) {

        //Only call super if the exception is SerializationException
        if (thrownException instanceof SerializationException) {
            //This will shutdown the container.
            super.handle(thrownException, records, consumer, container);
        } else {
            //Wrap …
Run Code Online (Sandbox Code Playgroud)

error-handling serialization deserialization spring-kafka

9
推荐指数
1
解决办法
3077
查看次数