dze*_*sik 6 java spring amqp rabbitmq spring-amqp
我想在我的应用程序中实现以下场景:
我能够(参见下面的代码)在死信队列中将消息延迟一定的时间。并且消息在传入队列和死信队列之间无限循环。到目前为止,一切都很好。
主要问题:如何拦截该进程并手动将消息(如步骤 3 中所述)路由到 parkingLot 队列以供以后进一步分析?
第二个问题:我可以只用一次交换来实现相同的过程吗?
这是我的两个课程的简化版本:
配置类
@Configuration
public class MailRabbitMQConfig {
@Bean
TopicExchange incomingExchange() {
TopicExchange incomingExchange = new TopicExchange(incomingExchangeName);
return incomingExchange;
}
@Bean
TopicExchange dlExchange() {
TopicExchange dlExchange = new TopicExchange(deadLetterExchangeName);
return dlExchange;
}
@Bean
Queue incomingQueue() {
return QueueBuilder.durable(incomingQueueName)
.withArgument(
"x-dead-letter-exchange",
dlExchange().getName()
)
.build();
}
@Bean
public Queue parkingLotQueue() {
return new Queue(parkingLotQueueName);
}
@Bean
Binding incomingBinding() {
return BindingBuilder
.bind(incomingQueue())
.to(incomingExchange())
.with("#");
}
@Bean
public Queue dlQueue() {
return QueueBuilder
.durable(deadLetterQueueName)
.withArgument("x-message-ttl", 10000)
.withArgument("x-dead-letter-exchange", incomingExchange()
.getName())
.build();
}
@Bean
Binding dlBinding() {
return BindingBuilder
.bind(dlQueue())
.to(dlExchange())
.with("#");
}
@Bean
public Binding bindParkingLot(
Queue parkingLotQueue,
TopicExchange dlExchange
) {
return BindingBuilder.bind(parkingLotQueue)
.to(dlExchange)
.with(parkingLotRoutingKeyName);
}
}
Run Code Online (Sandbox Code Playgroud)
消费类
@Component
public class Consumer {
private final Logger logger = LoggerFactory.getLogger(Consumer.class);
@RabbitListener(queues = "${mail.rabbitmq.queue.incoming}")
public Boolean receivedMessage(MailDataExternalTemplate mailDataExternalTemplate) throws Exception {
try {
// business logic here
} catch (Exception e) {
throw new AmqpRejectAndDontRequeueException("Failed to handle a business logic");
}
return Boolean.TRUE;
}
}
Run Code Online (Sandbox Code Playgroud)
我知道我可以在 Consumer 类中为 deadLetter 队列定义一个额外的侦听器,如下所示:
@RabbitListener(queues = "${mail.rabbitmq.queue.deadletter}")
public void receivedMessageFromDlq(Message failedMessage) throws Exception {
// Logic to count x-retries header property value and send a failed message manually
// to the parkingLot Queue
}
Run Code Online (Sandbox Code Playgroud)
然而,它并没有按预期工作,因为一旦消息到达 deadLetter 队列的头部,就会调用此侦听器,而不会延迟。
先感谢您。
编辑:我能够在@ArtemBilan 和@GaryRussell 的帮助下解决这个问题。主要解决方案提示位于已接受答案的评论中。谢谢你们的帮助!下面您将找到一个新图表,显示消息传递过程以及配置和消费者类。主要变化是:
MailRabbitMQConfig。Consumer处理配置类
@Configuration
public class MailRabbitMQConfig {
@Autowired
public MailConfigurationProperties properties;
@Bean
TopicExchange incomingExchange() {
TopicExchange incomingExchange = new TopicExchange(properties.getRabbitMQ().getExchange().getIncoming());
return incomingExchange;
}
@Bean
TopicExchange dlExchange() {
TopicExchange dlExchange = new TopicExchange(properties.getRabbitMQ().getExchange().getDeadletter());
return dlExchange;
}
@Bean
Queue incomingQueue() {
return QueueBuilder.durable(properties.getRabbitMQ().getQueue().getIncoming())
.withArgument(
properties.getRabbitMQ().getQueue().X_DEAD_LETTER_EXCHANGE_HEADER,
dlExchange().getName()
)
.withArgument(
properties.getRabbitMQ().getQueue().X_DEAD_LETTER_ROUTING_KEY_HEADER,
properties.getRabbitMQ().getRoutingKey().getDeadLetter()
)
.build();
}
@Bean
public Queue parkingLotQueue() {
return new Queue(properties.getRabbitMQ().getQueue().getParkingLot());
}
@Bean
Binding incomingBinding() {
return BindingBuilder
.bind(incomingQueue())
.to(incomingExchange())
.with(properties.getRabbitMQ().getRoutingKey().getIncoming());
}
@Bean
public Queue dlQueue() {
return QueueBuilder
.durable(properties.getRabbitMQ().getQueue().getDeadLetter())
.withArgument(
properties.getRabbitMQ().getMessages().X_MESSAGE_TTL_HEADER,
properties.getRabbitMQ().getMessages().getDelayTime()
)
.withArgument(
properties.getRabbitMQ().getQueue().X_DEAD_LETTER_EXCHANGE_HEADER,
incomingExchange().getName()
)
.withArgument(
properties.getRabbitMQ().getQueue().X_DEAD_LETTER_ROUTING_KEY_HEADER,
properties.getRabbitMQ().getRoutingKey().getIncoming()
)
.build();
}
@Bean
Binding dlBinding() {
return BindingBuilder
.bind(dlQueue())
.to(dlExchange())
.with(properties.getRabbitMQ().getRoutingKey().getDeadLetter());
}
@Bean
public Binding bindParkingLot(
Queue parkingLotQueue,
TopicExchange dlExchange
) {
return BindingBuilder.bind(parkingLotQueue)
.to(dlExchange)
.with(properties.getRabbitMQ().getRoutingKey().getParkingLot());
}
}
Run Code Online (Sandbox Code Playgroud)
消费类
@Component
public class Consumer {
private final Logger logger = LoggerFactory.getLogger(Consumer.class);
@Autowired
public MailConfigurationProperties properties;
@Autowired
protected EmailClient mailJetEmailClient;
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitListener(queues = "${mail.rabbitmq.queue.incoming}")
public Boolean receivedMessage(
@Payload MailDataExternalTemplate mailDataExternalTemplate,
Message amqpMessage
) {
logger.info("Received message");
try {
final EmailTransportWrapper emailTransportWrapper = mailJetEmailClient.convertFrom(mailDataExternalTemplate);
mailJetEmailClient.sendEmailUsing(emailTransportWrapper);
logger.info("Successfully sent an E-Mail");
} catch (Exception e) {
int count = getXDeathCountFromHeader(amqpMessage);
logger.debug("x-death count: " + count);
if (count >= properties.getRabbitMQ().getMessages().getRetryCount()) {
this.rabbitTemplate.send(
properties.getRabbitMQ().getExchange().getDeadletter(),
properties.getRabbitMQ().getRoutingKey().getParkingLot(),
amqpMessage
);
return Boolean.TRUE;
}
throw new AmqpRejectAndDontRequeueException("Failed to send an E-Mail");
}
return Boolean.TRUE;
}
private int getXDeathCountFromHeader(Message message) {
Map<String, Object> headers = message.getMessageProperties().getHeaders();
if (headers.get(properties.getRabbitMQ().getMessages().X_DEATH_HEADER) == null) {
return 0;
}
//noinspection unchecked
ArrayList<Map<String, ?>> xDeath = (ArrayList<Map<String, ?>>) headers
.get(properties.getRabbitMQ().getMessages().X_DEATH_HEADER);
Long count = (Long) xDeath.get(0).get("count");
return count.intValue();
}
Run Code Online (Sandbox Code Playgroud)
要延迟消息在队列中可用,您应该考虑使用DelayedExchange:https ://docs.spring.io/spring-amqp/docs/2.0.2.RELEASE/reference/html/_reference.html#delayed-message-交换。
至于手动发送到parkingLot队列,这很容易使用RabbitTemplate并使用其名称发送消息:
/**
* Send a message to a default exchange with a specific routing key.
*
* @param routingKey the routing key
* @param message a message to send
* @throws AmqpException if there is a problem
*/
void send(String routingKey, Message message) throws AmqpException;
Run Code Online (Sandbox Code Playgroud)
所有队列都通过其名称作为路由键绑定到默认交换机。
| 归档时间: |
|
| 查看次数: |
3329 次 |
| 最近记录: |