Cha*_*dan 9 rabbitmq spring-rabbit spring-amqp spring-rabbitmq
我是Spring AMQP的新手.我有一个应用程序,它是一个生产者向另一个消费者的应用程序发送消息.
消费者收到消息后,我们将对数据进行验证.
如果数据正确,我们必须确认并且应该从队列中删除消息.如果数据不正确,我们必须NACK(否定确认)数据,以便它将在RabbitMQ中重新排队.
我碰到
**factory.setDefaultRequeueRejected(false);**(它根本不会重新排列消息)
**factory.setDefaultRequeueRejected(true);**(它会在发生异常时重新排列消息)
但我的情况是,我将基于验证确认该消息.然后它应该删除该消息.如果NACK然后重新排列该消息.
我在RabbitMQ网站上看过
AMQP规范定义了basic.reject方法,该方法允许客户拒绝单个传递的消息,指示代理丢弃它们或重新排队它们
如何实现上述场景?请给我一些例子.
我尝试了一个小程序
logger.info("Job Queue Handler::::::::::" + new Date());
try {
}catch(Exception e){
logger.info("Activity Object Not Found Exception so message should be Re-queued the Message::::::::::::::");
}
factory.setErrorHandler(new ConditionalRejectingErrorHandler(cause ->{
return cause instanceof XMLException;
}));
Run Code Online (Sandbox Code Playgroud)
消息不是为不同的异常factory.setDefaultRequeueRejected(true)重新排队
09:46:38,854 ERROR [stderr](SimpleAsyncTaskExecutor-1) org.activiti.engine.ActivitiObjectNotFoundException:没有使用键'WF89012'部署的进程
09:46:39,102 INFO [com.example.bip.rabbitmq.handler.ErrorQueueHandler](SimpleAsyncTaskExecutor-1)从错误队列收到:{ERROR =无法提交JPA事务; 嵌套异常是 javax.persistence.RollbackException:标记为rollbackOnly的事务 }
Gar*_*ell 11
请参阅文档.
默认情况下,defaultRequeueRejected=true如果侦听器正常退出或者在侦听器抛出异常时拒绝(并重新排队),则容器将使用该消息(使其被删除).
如果侦听器(或错误处理程序)抛出AmqpRejectAndDontRequeueException,则会覆盖默认行为并丢弃该消息(如果已配置,则路由到DLX/DLQ) - 容器调用basicReject(false)而不是basicReject(true).
因此,如果您的验证失败,请抛出一个AmqpRejectAndDontRequeueException.或者,使用自定义错误处理程序配置侦听器,以将异常转换为AmqpRejectAndDontRequeueException.
如果您真的想要自己负责,请将确认模式设置为MANUAL并使用一种ChannelAwareMessageListener或这种技术(如果您使用的话)@RabbitListener.
但大多数人只是让容器处理事情(一旦他们了解正在发生的事情).通常,使用手动ack是针对特殊用例,例如延迟acks或早期acking.
编辑
在我指出的答案中有一个错误(现在已经修复); 你必须看看原因ListenerExecutionFailedException.我刚测试了这个,它按预期工作...
@SpringBootApplication
public class So39530787Application {
private static final String QUEUE = "So39530787";
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(So39530787Application.class, args);
RabbitTemplate template = context.getBean(RabbitTemplate.class);
template.convertAndSend(QUEUE, "foo");
template.convertAndSend(QUEUE, "bar");
template.convertAndSend(QUEUE, "baz");
So39530787Application bean = context.getBean(So39530787Application.class);
bean.latch.await(10, TimeUnit.SECONDS);
System.out.println("Expect 1 foo:" + bean.fooCount);
System.out.println("Expect 3 bar:" + bean.barCount);
System.out.println("Expect 1 baz:" + bean.bazCount);
context.close();
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setErrorHandler(new ConditionalRejectingErrorHandler(
t -> t instanceof ListenerExecutionFailedException && t.getCause() instanceof FooException));
return factory;
}
@Bean
public Queue queue() {
return new Queue(QUEUE, false, false, true);
}
private int fooCount;
private int barCount;
private int bazCount;
private final CountDownLatch latch = new CountDownLatch(5);
@RabbitListener(queues = QUEUE)
public void handle(String in) throws Exception {
System.out.println(in);
latch.countDown();
if ("foo".equals(in) && ++this.fooCount < 3) {
throw new FooException();
}
else if ("bar".equals(in) && ++this.barCount < 3) {
throw new BarException();
}
else if ("baz".equals(in)) {
this.bazCount++;
}
}
@SuppressWarnings("serial")
public static class FooException extends Exception { }
@SuppressWarnings("serial")
public static class BarException extends Exception { }
}
Run Code Online (Sandbox Code Playgroud)
结果:
Expect 1 foo:1
Expect 3 bar:3
Expect 1 baz:1
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
10512 次 |
| 最近记录: |