在特定场景中,在死信队列和 TTL 循环之后,如何使用 Spring 将 AMQP 消息发布到停车场队列?

dze*_*sik 6 java spring amqp rabbitmq spring-amqp

我想在我的应用程序中实现以下场景:

  1. 如果发生业务错误,消息应该从incomingQueue发送到deadLetter Queue并在那里延迟10秒
  2. 步骤 1 应重复 3 次
  3. 该消息应发布到停车场队列

实现场景

当前 RabbitMQ 队列设置

我能够(参见下面的代码)在死信队列中将消息延迟一定的时间。并且消息在传入队列和死信队列之间无限循环。到目前为止,一切都很好。

主要问题:如何拦截该进程并手动将消息(如步骤 3 中所述)路由到 parkingLot 队列以供以后进一步分析?

第二个问题:我可以只用一次交换来实现相同的过程吗?

这是我的两个课程的简化版本:

配置类

@Configuration
public class MailRabbitMQConfig {

    @Bean
    TopicExchange incomingExchange() {
       TopicExchange incomingExchange = new TopicExchange(incomingExchangeName);
        return incomingExchange;
    }

    @Bean
    TopicExchange dlExchange() {
        TopicExchange dlExchange = new TopicExchange(deadLetterExchangeName);
        return dlExchange;
    }

    @Bean
    Queue incomingQueue() {

        return QueueBuilder.durable(incomingQueueName)
                .withArgument(
                        "x-dead-letter-exchange",
                        dlExchange().getName()
                )
                .build();
    }

    @Bean
    public Queue parkingLotQueue() {
        return new Queue(parkingLotQueueName);
    }

    @Bean
    Binding incomingBinding() {
        return BindingBuilder
                .bind(incomingQueue())
                .to(incomingExchange())
                .with("#");
    }

    @Bean
    public Queue dlQueue() {
        return QueueBuilder
                .durable(deadLetterQueueName)
                .withArgument("x-message-ttl", 10000)
                .withArgument("x-dead-letter-exchange", incomingExchange()
                    .getName())
                .build();
    }

    @Bean
    Binding dlBinding() {
        return BindingBuilder
                .bind(dlQueue())
                .to(dlExchange())
                .with("#");
    }

    @Bean
    public Binding bindParkingLot(
            Queue parkingLotQueue,
            TopicExchange dlExchange
    ) {

        return BindingBuilder.bind(parkingLotQueue)
                    .to(dlExchange)
                    .with(parkingLotRoutingKeyName);
    }
}
Run Code Online (Sandbox Code Playgroud)

消费类

@Component
public class Consumer {

    private final Logger logger = LoggerFactory.getLogger(Consumer.class);

    @RabbitListener(queues = "${mail.rabbitmq.queue.incoming}")
    public Boolean receivedMessage(MailDataExternalTemplate mailDataExternalTemplate) throws Exception {

        try {
            // business logic here
        } catch (Exception e) {
            throw new AmqpRejectAndDontRequeueException("Failed to handle a business logic");
        }

        return Boolean.TRUE;
    }
}
Run Code Online (Sandbox Code Playgroud)

我知道我可以在 Consumer 类中为 deadLetter 队列定义一个额外的侦听器,如下所示:

@RabbitListener(queues = "${mail.rabbitmq.queue.deadletter}")
public void receivedMessageFromDlq(Message failedMessage) throws Exception {
    // Logic to count x-retries header property value and send a failed message manually
    // to the parkingLot Queue
}
Run Code Online (Sandbox Code Playgroud)

然而,它并没有按预期工作,因为一旦消息到达 deadLetter 队列的头部,就会调用此侦听器,而不会延迟。

先感谢您。


编辑:我能够在@ArtemBilan 和@GaryRussell 的帮助下解决这个问题。主要解决方案提示位于已接受答案的评论中。谢谢你们的帮助!下面您将找到一个新图表,显示消息传递过程以及配置和消费者类。主要变化是:

  • 类中传入交换->传入队列和死信交换->死信队列之间的路由定义MailRabbitMQConfig
  • 手动将消息发布到类中的停车场队列的循环Consumer处理

在此输入图像描述

配置类

@Configuration
public class MailRabbitMQConfig {
    @Autowired
    public MailConfigurationProperties properties;

    @Bean
    TopicExchange incomingExchange() {
        TopicExchange incomingExchange = new TopicExchange(properties.getRabbitMQ().getExchange().getIncoming());
        return incomingExchange;
    }

    @Bean
    TopicExchange dlExchange() {
        TopicExchange dlExchange = new TopicExchange(properties.getRabbitMQ().getExchange().getDeadletter());
        return dlExchange;
    }

    @Bean
    Queue incomingQueue() {
        return QueueBuilder.durable(properties.getRabbitMQ().getQueue().getIncoming())
            .withArgument(                 
                properties.getRabbitMQ().getQueue().X_DEAD_LETTER_EXCHANGE_HEADER,
                dlExchange().getName()
            )
            .withArgument(
                properties.getRabbitMQ().getQueue().X_DEAD_LETTER_ROUTING_KEY_HEADER,
                properties.getRabbitMQ().getRoutingKey().getDeadLetter()
            )
            .build();
    }

    @Bean
    public Queue parkingLotQueue() {
        return new Queue(properties.getRabbitMQ().getQueue().getParkingLot());
    }

    @Bean
    Binding incomingBinding() {
        return BindingBuilder
            .bind(incomingQueue())
            .to(incomingExchange())
            .with(properties.getRabbitMQ().getRoutingKey().getIncoming());
   }

    @Bean
    public Queue dlQueue() {
        return QueueBuilder
            .durable(properties.getRabbitMQ().getQueue().getDeadLetter())
            .withArgument(                      
                properties.getRabbitMQ().getMessages().X_MESSAGE_TTL_HEADER,
                properties.getRabbitMQ().getMessages().getDelayTime()
            )
            .withArgument(
                properties.getRabbitMQ().getQueue().X_DEAD_LETTER_EXCHANGE_HEADER,
                incomingExchange().getName()
            )
            .withArgument(
                properties.getRabbitMQ().getQueue().X_DEAD_LETTER_ROUTING_KEY_HEADER,
                properties.getRabbitMQ().getRoutingKey().getIncoming()
            )
            .build();
    }

    @Bean
    Binding dlBinding() {
        return BindingBuilder
            .bind(dlQueue())
            .to(dlExchange())
            .with(properties.getRabbitMQ().getRoutingKey().getDeadLetter());
    }

    @Bean
    public Binding bindParkingLot(
        Queue parkingLotQueue,
        TopicExchange dlExchange
    ) {
        return BindingBuilder.bind(parkingLotQueue)
            .to(dlExchange)
            .with(properties.getRabbitMQ().getRoutingKey().getParkingLot());
    }
}
Run Code Online (Sandbox Code Playgroud)

消费类

@Component
public class Consumer {
    private final Logger logger = LoggerFactory.getLogger(Consumer.class);

    @Autowired
    public MailConfigurationProperties properties;

    @Autowired
    protected EmailClient mailJetEmailClient;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitListener(queues = "${mail.rabbitmq.queue.incoming}")
    public Boolean receivedMessage(
        @Payload MailDataExternalTemplate mailDataExternalTemplate,
        Message amqpMessage
    ) {
        logger.info("Received message");

        try {
            final EmailTransportWrapper emailTransportWrapper = mailJetEmailClient.convertFrom(mailDataExternalTemplate);

            mailJetEmailClient.sendEmailUsing(emailTransportWrapper);
            logger.info("Successfully sent an E-Mail");
        } catch (Exception e) {
            int count = getXDeathCountFromHeader(amqpMessage);
            logger.debug("x-death count: " + count);

            if (count >= properties.getRabbitMQ().getMessages().getRetryCount()) {
                this.rabbitTemplate.send(
                     properties.getRabbitMQ().getExchange().getDeadletter(),
                     properties.getRabbitMQ().getRoutingKey().getParkingLot(),
                     amqpMessage
                );
                return Boolean.TRUE;
            }

            throw new AmqpRejectAndDontRequeueException("Failed to send an E-Mail");
        }

        return Boolean.TRUE;
    }

    private int getXDeathCountFromHeader(Message message) {
        Map<String, Object> headers = message.getMessageProperties().getHeaders();
        if (headers.get(properties.getRabbitMQ().getMessages().X_DEATH_HEADER) == null) {
            return 0;
        }

        //noinspection unchecked
        ArrayList<Map<String, ?>> xDeath = (ArrayList<Map<String, ?>>) headers
            .get(properties.getRabbitMQ().getMessages().X_DEATH_HEADER);
        Long count = (Long) xDeath.get(0).get("count");
        return count.intValue();
    }
Run Code Online (Sandbox Code Playgroud)

Art*_*lan 1

要延迟消息在队列中可用,您应该考虑使用DelayedExchangehttps ://docs.spring.io/spring-amqp/docs/2.0.2.RELEASE/reference/html/_reference.html#delayed-message-交换

至于手动发送到parkingLot队列,这很容易使用RabbitTemplate并使用其名称发送消息:

/**
 * Send a message to a default exchange with a specific routing key.
 *
 * @param routingKey the routing key
 * @param message a message to send
 * @throws AmqpException if there is a problem
 */
void send(String routingKey, Message message) throws AmqpException;
Run Code Online (Sandbox Code Playgroud)

所有队列都通过其名称作为路由键绑定到默认交换机。