RabbitMQ 发生异常时延迟消息

abu*_*lor 5 rabbitmq spring-rabbit rabbitmq-exchange

我正在使用该插件(https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq)并且它运行良好。我发送了一条延迟 X 秒的消息,并且延迟了 X 秒进行了处理。

问题出在流程逻辑上。如果它运作良好(快乐的道路),我就没有问题。但如果该过程失败,我期望的是该消息会以相同的延迟重新排队处理,但会立即处理。

有没有办法在发生异常时以原始指定的延迟自动重新排队消息?

Gar*_*ell 2

不; 延迟不适用于重试;如果消息顺序不重要,可以将消息重新发布到队列尾部。

或者,您可以配置具有固定回退的重试拦截器。

https://docs.spring.io/spring-amqp/docs/current/reference/html/#retry

Spring Retry 提供了几个 AOP 拦截器和很大的灵活性来指定重试的参数(尝试次数、异常类型、退避算法等)。Spring AMQP 还提供了一些方便的工厂 bean,用于以 AMQP 用例的便捷形式创建 Spring Retry 拦截器,并具有可用于实现自定义恢复逻辑的强类型回调接口。...

编辑

使用 aRabbitTemplate而不是消息驱动的侦听器:

@SpringBootApplication
@EnableScheduling
@EnableTransactionManagement
public class So69020120Application {

    public static void main(String[] args) {
        SpringApplication.run(So69020120Application.class, args);
    }

    @Autowired
    Processor processor;

    @Scheduled(fixedDelay = 5000)
    public void sched() {
        try {
            while (this.processor.process()) {
            }
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Bean
    RabbitTransactionManager transactionManager(ConnectionFactory cf) {
        return new RabbitTransactionManager(cf);
    }

    @Bean
    public ApplicationRunner runner(RabbitTemplate template) {
        return args -> {
            IntStream.range(0,  4).forEach(i -> template.convertAndSend("queue", "good"));
            template.convertAndSend("queue", "fail");
            IntStream.range(0,  4).forEach(i -> template.convertAndSend("queue", "good"));
        };
    }

    @Bean
    Queue queue() {
        return new Queue("queue");
    }

}

@Component
class Processor {

    private final RabbitTemplate template;

    private final AtomicBoolean fail = new AtomicBoolean(true);

    Processor(RabbitTemplate template) {
        this.template = template;
        template.setChannelTransacted(true);
    }

    @Transactional
    public boolean process() {
        String data = (String) template.receiveAndConvert("queue");
        if (data == null) {
            System.out.println("No More Messages");
            return false;
        }
        System.out.println(data);
        if (data.equals("fail") && this.fail.getAndSet(false)) {
            throw new RuntimeException("test");
        }
        return true;
    }

}
Run Code Online (Sandbox Code Playgroud)
good
good
good
good
fail
java.lang.RuntimeException: test
    at com.example.demo.Processor.process(So69020120Application.java:86)
    at com.example.demo.Processor$$FastClassBySpringCGLIB$$6adeaa38.invoke(<generated>)
    at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:779)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
    at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:123)
    at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:388)
    at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:692)
    at com.example.demo.Processor$$EnhancerBySpringCGLIB$$bad30db1.process(<generated>)
    at com.example.demo.So69020120Application.sched(So69020120Application.java:36)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
fail
good
good
good
good
No More Messages
Run Code Online (Sandbox Code Playgroud)