RabbitMQ 在事务中发送消息

use*_*718 8 rabbitmq spring-amqp spring-boot

是否可以在事务中运行以下代码,以便如果业务处理中引发异常,我们可以回滚发送到队列的消息?

rabbitTemplate.convertAndSend("queue1", data);

//do some processing

rabbitTemplate.convertAndSend("queue2", data);
Run Code Online (Sandbox Code Playgroud)

需要这样做的是,如果在向队列 1 发送消息后出现问题,但我们无法向队列 2 发送消息,该怎么办?或者,如果在将消息发送到队列时出现网络问题或其他问题怎么办?

Gar*_*ell 9

onMessage()如果此代码在侦听器容器线程(或)上运行,@RabbitListener并且容器和模板都具有setChannelTransacted(true),则发布(和交付)将在同一事务中运行;抛出异常将导致所有内容回滚。

如果这是在某个任意 java 类中(不在容器线程上运行),那么您需要在方法运行之前启动事务...

    @Transactional
    public void send(String in) {
        this.template.convertAndSend("foo", in);
        if (in.equals("foo")) {
            throw new RuntimeException("test");
        }
        this.template.convertAndSend("bar", in);
    }
Run Code Online (Sandbox Code Playgroud)

这是一个完整的 Spring Boot 应用程序,演示了该功能......

@SpringBootApplication
@EnableTransactionManagement
public class So40749877Application {

    public static void main(String[] args) {
        ConfigurableApplicationContext context = SpringApplication.run(So40749877Application.class, args);
        Foo foo = context.getBean(Foo.class);
        try {
            foo.send("foo");
        }
        catch (Exception e) {}
        foo.send("bar");
        RabbitTemplate template = context.getBean(RabbitTemplate.class);
        // should not get any foos...
        System.out.println(template.receiveAndConvert("foo", 10_000));
        System.out.println(template.receiveAndConvert("bar", 10_000));
        // should be null
        System.out.println(template.receiveAndConvert("foo", 0));
        RabbitAdmin admin = context.getBean(RabbitAdmin.class);
        admin.deleteQueue("foo");
        admin.deleteQueue("bar");
        context.close();
    }

    @Bean
    public RabbitTemplate amqpTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setChannelTransacted(true);
        return rabbitTemplate;
    }

    @Bean
    public Queue foo() {
        return new Queue("foo");
    }

    @Bean
    public Queue bar() {
        return new Queue("bar");
    }

    @Bean
    public Foo fooBean() {
        return new Foo();
    }

    @Bean
    public PlatformTransactionManager transactionManager(ConnectionFactory connectionFactory) {
        return new RabbitTransactionManager(connectionFactory);
    }

    public static class Foo {

        @Autowired
        private RabbitTemplate template;

        @Transactional
        public void send(String in) {
            this.template.convertAndSend("foo", in);
            if (in.equals("foo")) {
                throw new RuntimeException("test");
            }
            this.template.convertAndSend("bar", in);
        }

    }

}
Run Code Online (Sandbox Code Playgroud)

编辑

消费者端的交易;这在使用 Spring 时通常不适用,因为它管理事务,但是当直接使用客户端时......

Connection connection = cf.createConnection();
Channel channel = connection.createChannel(true);
channel.basicQos(1);
channel.txSelect();
CountDownLatch latch = new CountDownLatch(1);
channel.basicConsume("foo", new DefaultConsumer(channel) {

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
            byte[] body) throws IOException {
        System.out.println(new String(body));

        getChannel().txRollback(); // delivery won't be requeued; remains unacked

        if (envelope.isRedeliver()) {
            getChannel().basicAck(envelope.getDeliveryTag(), false);
            getChannel().txCommit(); // commit the ack so the message is removed
            getChannel().basicCancel(consumerTag);
            latch.countDown();
        }
        else { // first time, let's requeue
            getChannel().basicReject(envelope.getDeliveryTag(), true);
            getChannel().txCommit(); // commit the reject so the message will be requeued
        }
    }

});
latch.await();
channel.close();
connection.close();
Run Code Online (Sandbox Code Playgroud)

请注意,txRollback在这种情况下, 不执行任何操作;只有确认(或拒绝)是事务性的。