Spring-boot-starter RabbitMQ全局错误处理

Vel*_*aga 3 rabbitmq spring-rabbit spring-boot rabbitmq-exchange spring-rabbitmq

我使用spring-boot-starter-amqp 1.4.2.Producer和consumer工作正常,但有时传入的JSON消息的语法不正确.这导致以下(正确)异常:

org.springframework.amqp.rabbit.listener.ListenerExecutionFailedException: Listener threw exception
Caused by:  org.springframework.amqp.support.converter.MessageConversionException: Failed to convert Message content
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Can not deserialize instance of java.lang.String out of START_ARRAY token...
Run Code Online (Sandbox Code Playgroud)

将来我可能面临更多例外.所以我想配置一个全局错误处理程序,这样如果任何一个消费者都有任何异常我可以全局处理它.

注意:在这种情况下,消息根本没有到达消费者.我希望在整个消费者中处理这些类型的异常.

请找到以下代码:

RabbitConfiguration.java

@Configuration
@EnableRabbit
public class RabbitMqConfiguration {

    @Autowired
    private CachingConnectionFactory cachingConnectionFactory;

    @Bean
    public MessageConverter jsonMessageConverter()
    {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    @Primary
    public RabbitTemplate rabbitTemplate()
    {
        RabbitTemplate template = new RabbitTemplate(cachingConnectionFactory);
        template.setMessageConverter(jsonMessageConverter());
        return template;
    }

}
Run Code Online (Sandbox Code Playgroud)

消费者

@RabbitListener(
        id = "book_queue",
        bindings = @QueueBinding(
                value = @Queue(value = "book.queue", durable = "true"),
                exchange = @Exchange(value = "book.exchange", durable = "true", delayed = "true"),
                key = "book.queue"
        )
    )
public void handle(Message message) {
//Business Logic
}
Run Code Online (Sandbox Code Playgroud)

任何人都可以帮助我全局处理错误处理程序.你的帮助应该是值得的.

根据Gary评论更新了问题

我可以像你说的那样运行你的例子并得到预期的输出,我只想根据你的例子尝试一些更多的负面案例,但我无法理解一些事情,

this.template.convertAndSend(queue().getName(), new Foo("bar"));
Run Code Online (Sandbox Code Playgroud)

产量

收到:Foo [foo = bar]

上面的代码工作正常.现在我发送一些其他bean而不是"Foo"

this.template.convertAndSend(queue().getName(), new Differ("snack","Hihi","how are you"));
Run Code Online (Sandbox Code Playgroud)

产量

收到:Foo [foo = null]

消费者不应该接受这个消息,因为它完全是一个不同的bean(Differ.class而不是Foo.class)所以我希望它应该转到"ConditionalRejectingErrorHandler".为什么它接受错误的有效负载并打印为null?如果我错了,请纠正我.

编辑1:

加里,正如你所说我在发送消息时设置了标题" TypeId ",但仍然消费者能够转换错误的消息并且它没有抛出任何错误...请找到下面的代码,我已经使用了你的代码样本而已做了以下修改,

1)发送消息时添加"__TypeId__",

this.template.convertAndSend(queue().getName(), new Differ("snack","hihi","how are you"),m -> {
        m.getMessageProperties().setHeader("__TypeId__","foo");
        return m;
    }); 
Run Code Online (Sandbox Code Playgroud)

2)在"Jackson2JsonMessageConverter"中添加了"DefaultClassMapper"

@Bean
public MessageConverter jsonConverter() {
    Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
    DefaultClassMapper mapper = new DefaultClassMapper();
    mapper.setDefaultType(Foo.class);
    converter.setClassMapper(mapper);
    return new Jackson2JsonMessageConverter();
}    
Run Code Online (Sandbox Code Playgroud)

Gar*_*ell 9

覆盖Boot的侦听器容器工厂bean,如启用侦听器端点注释中所述.

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setErrorHandler(myErrorHandler());
    ...
    return factory;
}
Run Code Online (Sandbox Code Playgroud)

您可以注入一个自定义实现,ErrorHandler该实现将添加到工厂创建的每个侦听器容器中.

void handleError(Throwable t);
Run Code Online (Sandbox Code Playgroud)

throwable将是一个ListenerExecutionFailedException,从版本1.6.7(引导1.4.4)开始,在其failedMessage属性中具有原始入站消息.

默认错误处理程序会考虑MessageConversionException致命原因(它们不会被重新排队).

如果你希望保留这种行为(对于这些问题是正常的),你应该AmqpRejectAndDontRequeueException在处理错误之后抛出一个.

顺便说一下,你不需要那个RabbitTemplate豆子; 如果MessageConverter应用程序中只有一个bean,则boot会自动将其连接到容器和模板中.

由于您将覆盖启动的工厂,你不得不丝转换器那里.

编辑

您可以使用默认值ConditionalRejectingErrorHandler,但使用自定义实现注入它FatalExceptionStrategy.实际上,您可以继承它DefaultExceptionStrategy并覆盖isFatal(Throwable t),然后在处理错误后返回super.isFatal(t).

EDIT2

完整的例子; 发送1条好消息和1条坏消息:

package com.example;

import org.slf4j.Logger;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler;
import org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.util.ErrorHandler;

@SpringBootApplication
public class So42215050Application {

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(So42215050Application.class, args);
        context.getBean(So42215050Application.class).runDemo();
        context.close();
    }

    @Autowired
    private RabbitTemplate template;

    private void runDemo() throws Exception {
        this.template.convertAndSend(queue().getName(), new Foo("bar"));
        this.template.convertAndSend(queue().getName(), new Foo("bar"), m -> {
            return new Message("some bad json".getBytes(), m.getMessageProperties());
        });
        Thread.sleep(5000);
    }

    @RabbitListener(queues = "So42215050")
    public void handle(Foo in) {
        System.out.println("Received: " + in);
    }

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(jsonConverter());
        factory.setErrorHandler(errorHandler());
        return factory;
    }

    @Bean
    public ErrorHandler errorHandler() {
        return new ConditionalRejectingErrorHandler(new MyFatalExceptionStrategy());
    }

    @Bean
    public Queue queue() {
        return new Queue("So42215050", false, false, true);
    }

    @Bean
    public MessageConverter jsonConverter() {
        return new Jackson2JsonMessageConverter();
    }

    public static class MyFatalExceptionStrategy extends ConditionalRejectingErrorHandler.DefaultExceptionStrategy {

        private final Logger logger = org.slf4j.LoggerFactory.getLogger(getClass());

        @Override
        public boolean isFatal(Throwable t) {
            if (t instanceof ListenerExecutionFailedException) {
                ListenerExecutionFailedException lefe = (ListenerExecutionFailedException) t;
                logger.error("Failed to process inbound message from queue "
                        + lefe.getFailedMessage().getMessageProperties().getConsumerQueue()
                        + "; failed message: " + lefe.getFailedMessage(), t);
            }
            return super.isFatal(t);
        }

    }

    public static class Foo {

        private String foo;

        public Foo() {
            super();
        }

        public Foo(String foo) {
            this.foo = foo;
        }

        public String getFoo() {
            return this.foo;
        }

        public void setFoo(String foo) {
            this.foo = foo;
        }

        @Override
        public String toString() {
            return "Foo [foo=" + this.foo + "]";
        }

    }
}
Run Code Online (Sandbox Code Playgroud)

结果:

Received: Foo [foo=bar]
Run Code Online (Sandbox Code Playgroud)

2017-02-14 09:42:50.972 ERROR 44868 --- [cTaskExecutor-1] 5050Application $ MyFatalExceptionStrategy:无法处理来自队列So42215050的入站消息; 失败的消息:(正文:'some bad json'MessageProperties [headers = { TypeId = com.example.So42215050Application $ Foo},timestamp = null,messageId = null,userId = null,receivedUserId = null,appId = null,clusterId = null ,type = null,correlationId = null,correlationIdString = null,replyTo = null,contentType = application/json,contentEncoding = UTF-8,contentLength = 0,deliveryMode = null,receivedDeliveryMode = PERSISTENT,expiration = null,priority = 0,redelivered = false,receivedExchange =,receivedRoutingKey = So42215050,receivedDelay = null,deliveryTag = 2,messageCount = 0,consumerTag = amq.ctag-P2QqY0PMD1ppX5NnkUPhFA,consumerQueue = So42215050])

EDIT3

JSON不传达任何类型信息.默认情况下,将从方法参数类型推断要转换为的类型.如果您希望拒绝任何无法转换为该类型的内容,则需要适当地配置消息转换器.

例如:

@Bean
public MessageConverter jsonConverter() {
    Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
    DefaultClassMapper mapper = new DefaultClassMapper();
    mapper.setDefaultType(Foo.class);
    converter.setClassMapper(mapper);
    return converter;
}
Run Code Online (Sandbox Code Playgroud)

现在,当我改变我的例子来发送一个Bar而不是Foo...

public static class Bar {

   ...

}
Run Code Online (Sandbox Code Playgroud)

this.template.convertAndSend(queue().getName(), new Bar("baz"));
Run Code Online (Sandbox Code Playgroud)

我明白了......

Caused by: org.springframework.amqp.support.converter.MessageConversionException: Cannot handle message
... 13 common frames omitted
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [com.example.So42215050Application$Bar] to [com.example.So42215050Application$Foo] for GenericMessage [payload=Bar [foo=baz], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=So42215050, amqp_contentEncoding=UTF-8, amqp_deliveryTag=3, amqp_consumerQueue=So42215050, amqp_redelivered=false, id=6d7e23a3-c2a7-2417-49c9-69e3335aa485, amqp_consumerTag=amq.ctag-6JIGkpmkrTKaG32KVpf8HQ, contentType=application/json, __TypeId__=com.example.So42215050Application$Bar, timestamp=1488489538017}]
Run Code Online (Sandbox Code Playgroud)

但这只适用于发件人设置__TypeId__标头(如果配置了相同的适配器,模板会执行此操作).

EDIT4

@SpringBootApplication
public class So42215050Application {

    private final Logger logger = org.slf4j.LoggerFactory.getLogger(getClass());

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(So42215050Application.class, args);
        context.getBean(So42215050Application.class).runDemo();
        context.close();
    }

    @Autowired
    private RabbitTemplate template;

    private void runDemo() throws Exception {
        this.template.convertAndSend(queue().getName(), new Foo("bar")); // good - converter sets up type
        this.template.convertAndSend(queue().getName(), new Foo("bar"), m -> {
            return new Message("some bad json".getBytes(), m.getMessageProperties()); // fail bad json
        });
        Message message = MessageBuilder
                .withBody("{\"foo\":\"bar\"}".getBytes())
                .andProperties(
                        MessagePropertiesBuilder
                            .newInstance()
                            .setContentType("application/json")
                            .build())
                .build();
        this.template.send(queue().getName(), message); // Success - default Foo class when no header
        message.getMessageProperties().setHeader("__TypeId__", "foo");
        this.template.send(queue().getName(), message); // Success - foo is mapped to Foo
        message.getMessageProperties().setHeader("__TypeId__", "bar");
        this.template.send(queue().getName(), message); // fail - mapped to a Map
        Thread.sleep(5000);
    }

    @RabbitListener(queues = "So42215050")
    public void handle(Foo in) {
        logger.info("Received: " + in);
    }

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(jsonConverter());
        factory.setErrorHandler(errorHandler());
        return factory;
    }

    @Bean
    public ErrorHandler errorHandler() {
        return new ConditionalRejectingErrorHandler(new MyFatalExceptionStrategy());
    }

    @Bean
    public Queue queue() {
        return new Queue("So42215050", false, false, true);
    }

    @Bean
    public MessageConverter jsonConverter() {
        Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
        DefaultClassMapper mapper = new DefaultClassMapper();
        mapper.setDefaultType(Foo.class);
        Map<String, Class<?>> mappings = new HashMap<>();
        mappings.put("foo", Foo.class);
        mappings.put("bar", Object.class);
        mapper.setIdClassMapping(mappings);
        converter.setClassMapper(mapper);
        return converter;
    }

    public static class MyFatalExceptionStrategy extends ConditionalRejectingErrorHandler.DefaultExceptionStrategy {

        private final Logger logger = org.slf4j.LoggerFactory.getLogger(getClass());

        @Override
        public boolean isFatal(Throwable t) {
            if (t instanceof ListenerExecutionFailedException) {
                ListenerExecutionFailedException lefe = (ListenerExecutionFailedException) t;
                logger.error("Failed to process inbound message from queue "
                        + lefe.getFailedMessage().getMessageProperties().getConsumerQueue()
                        + "; failed message: " + lefe.getFailedMessage(), t);
            }
            return super.isFatal(t);
        }

    }

    public static class Foo {

        private String foo;

        public Foo() {
            super();
        }

        public Foo(String foo) {
            this.foo = foo;
        }

        public String getFoo() {
            return this.foo;
        }

        public void setFoo(String foo) {
            this.foo = foo;
        }

        @Override
        public String toString() {
            return "Foo [foo=" + this.foo + "]";
        }

    }

    public static class Bar {

        private String foo;

        public Bar() {
            super();
        }

        public Bar(String foo) {
            this.foo = foo;
        }

        public String getFoo() {
            return this.foo;
        }

        public void setFoo(String foo) {
            this.foo = foo;
        }

        @Override
        public String toString() {
            return "Bar [foo=" + this.foo + "]";
        }

    }

}
Run Code Online (Sandbox Code Playgroud)