RabbitMQ - Apache Camel Reading消息如何处理失败的消息

Rob*_*_UK 6 java rabbitmq dead-letter

我有以下PHP应用程序.将用户signUp发布到消息队列.Java应用程序从该队列中读取并导入它.希望下面的图表能够描述它.我只是在Java方面工作.json消息已存在于队列中.

在此输入图像描述

路线(Java消费方).

@Component
public class SignUpRouting {

  errorHandler(deadLetterChannel("rabbitmq://signUpDeadLetter.exchange?username=etc..").useOriginalMessage());

  from("rabbitmq://phpSignUp.exchange?username=etc....")
            .routeId("signUpRoute")
            .processRef("signUpProcessor")
            .end();
  //.... 
Run Code Online (Sandbox Code Playgroud)

处理器..

@Component
public class SignupProcessor implements Processor {

    private ObjectMapper mapper = new ObjectMapper();

    @Override
    public void process(Exchange exchange) throws Exception {

        String json = exchange.getIn().getBody(String.class);
        SignUpDto dto = mapper.readValue(json, SignUpDto.class);

        SignUp signUp = new SignUp();
        signUp.setWhatever(dto.getWhatever());
        //etc....

        // save record
        signUpDao.save(signUp);
    }
}
Run Code Online (Sandbox Code Playgroud)

我的问题是这个..当处理器无法导入消息时,我该怎么办?

让我们说例如有一个DAO例外.数据字段可能是工具或导入的格式不正确.我不想失去这个消息.我想看到错误并重试导入.但我不想每30秒重复一次这条消息.

我想我需要创建另一个队列..一个死信队列并且每6个小时无限次地重试该消息?...然后我会查看日志查看错误并上传修复并且消息将被重新处理?

我该如何实现?还是我走错了路?

编辑 我已经尝试设置deadLetterExchange以查看是否会在正确的方向上进行操作...但是它出错并且说队列不能为空

 rabbitmq://phpSignUp.exchange?username=etc...&deadLetterExchange=signUpDeadLetter.exchange
Run Code Online (Sandbox Code Playgroud)

san*_*igo 2

以下是使用死信标头的示例:

        <from uri="rabbitmq://localhost/youexchange?queue=yourq1&amp;
            exchangeType=topic&amp;
            routingKey=user.reg.*&amp;
            deadLetterExchange=dead.msgs&amp;
            deadLetterExchangeType=topic&amp;
            deadLetterRoutingKey=dead.letters&amp;
            deadLetterQueue=dead.letters&amp;
            autoAck=false&amp;
            autoDelete=false"/>

          <!--We can use onException to make camel to retry, and after that, dead letter queue are the fallback-->
        <onException useOriginalMessage="true">
            <exception>java.lang.Exception</exception>
            <redeliveryPolicy asyncDelayedRedelivery="true" maximumRedeliveries="3" redeliveryDelay="5000"/>
        </onException>
Run Code Online (Sandbox Code Playgroud)

我们需要关闭autoAck并设置deadLetterQueue,那么如果抛出异常,消息将进入死信队列。使用onException,我们可以控制camel将消息放入死信队列之前的重试。