use*_*718 8 rabbitmq spring-amqp spring-boot
是否可以在事务中运行以下代码,以便如果业务处理中引发异常,我们可以回滚发送到队列的消息?
rabbitTemplate.convertAndSend("queue1", data);
//do some processing
rabbitTemplate.convertAndSend("queue2", data);
Run Code Online (Sandbox Code Playgroud)
需要这样做的是,如果在向队列 1 发送消息后出现问题,但我们无法向队列 2 发送消息,该怎么办?或者,如果在将消息发送到队列时出现网络问题或其他问题怎么办?
onMessage()
如果此代码在侦听器容器线程(或)上运行,@RabbitListener
并且容器和模板都具有setChannelTransacted(true)
,则发布(和交付)将在同一事务中运行;抛出异常将导致所有内容回滚。
如果这是在某个任意 java 类中(不在容器线程上运行),那么您需要在方法运行之前启动事务...
@Transactional
public void send(String in) {
this.template.convertAndSend("foo", in);
if (in.equals("foo")) {
throw new RuntimeException("test");
}
this.template.convertAndSend("bar", in);
}
Run Code Online (Sandbox Code Playgroud)
这是一个完整的 Spring Boot 应用程序,演示了该功能......
@SpringBootApplication
@EnableTransactionManagement
public class So40749877Application {
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(So40749877Application.class, args);
Foo foo = context.getBean(Foo.class);
try {
foo.send("foo");
}
catch (Exception e) {}
foo.send("bar");
RabbitTemplate template = context.getBean(RabbitTemplate.class);
// should not get any foos...
System.out.println(template.receiveAndConvert("foo", 10_000));
System.out.println(template.receiveAndConvert("bar", 10_000));
// should be null
System.out.println(template.receiveAndConvert("foo", 0));
RabbitAdmin admin = context.getBean(RabbitAdmin.class);
admin.deleteQueue("foo");
admin.deleteQueue("bar");
context.close();
}
@Bean
public RabbitTemplate amqpTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setChannelTransacted(true);
return rabbitTemplate;
}
@Bean
public Queue foo() {
return new Queue("foo");
}
@Bean
public Queue bar() {
return new Queue("bar");
}
@Bean
public Foo fooBean() {
return new Foo();
}
@Bean
public PlatformTransactionManager transactionManager(ConnectionFactory connectionFactory) {
return new RabbitTransactionManager(connectionFactory);
}
public static class Foo {
@Autowired
private RabbitTemplate template;
@Transactional
public void send(String in) {
this.template.convertAndSend("foo", in);
if (in.equals("foo")) {
throw new RuntimeException("test");
}
this.template.convertAndSend("bar", in);
}
}
}
Run Code Online (Sandbox Code Playgroud)
编辑
消费者端的交易;这在使用 Spring 时通常不适用,因为它管理事务,但是当直接使用客户端时......
Connection connection = cf.createConnection();
Channel channel = connection.createChannel(true);
channel.basicQos(1);
channel.txSelect();
CountDownLatch latch = new CountDownLatch(1);
channel.basicConsume("foo", new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
System.out.println(new String(body));
getChannel().txRollback(); // delivery won't be requeued; remains unacked
if (envelope.isRedeliver()) {
getChannel().basicAck(envelope.getDeliveryTag(), false);
getChannel().txCommit(); // commit the ack so the message is removed
getChannel().basicCancel(consumerTag);
latch.countDown();
}
else { // first time, let's requeue
getChannel().basicReject(envelope.getDeliveryTag(), true);
getChannel().txCommit(); // commit the reject so the message will be requeued
}
}
});
latch.await();
channel.close();
connection.close();
Run Code Online (Sandbox Code Playgroud)
请注意,txRollback
在这种情况下, 不执行任何操作;只有确认(或拒绝)是事务性的。
归档时间: |
|
查看次数: |
14791 次 |
最近记录: |