Maj*_*imi 3 java amqp rabbitmq
我在我的应用程序中执行以下操作:
从经纪人那里得到1条消息(手动确认)
做一些处理
在数据库和代理上启动事务
在数据库中插入一些记录并在代理上发布一些消息(不同的队列)
提交数据库和代理
您在步骤1中从经纪人处获得的确认消息.
经纪人的所有操作都是通过单一渠道完成的.这是准备代码:
Connection brokerConnection = factory.newConnection();
Channel channel = brokerConnection.createChannel();
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume("receive-queue", false, consumer);
Run Code Online (Sandbox Code Playgroud)
以下是我的代码.我已删除try,catch部分讲清楚.我将所有例外记录到文件中.步骤1:
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
Request request = (Request) SerializationUtils.deserialize(delivery.getBody());
Run Code Online (Sandbox Code Playgroud)
第2,3,4,5步:
dbConnection.setAutoCommit(false);
channel.txSelect();
stmt = dbConnection.prepareStatement(query);
/* set paramteres */
stmt.executeUpdate();
channel.basicPublish(/* exchange name */, "KEY", MessageProperties.PERSISTENT_BASIC, /* result */ result);
dbConnection.commit();
channel.txCommit();
dbConnection.setAutoCommit(true);
Run Code Online (Sandbox Code Playgroud)
第6步:
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
Run Code Online (Sandbox Code Playgroud)
在一次迭代之后,我可以在数据库和代理中看到记录(意味着它在第5步之前正常工作).问题是在步骤6之后未删除接收队列上的消息,并且管理插件显示一条未确认的消息.此外,我在日志文件中看不到任何异常.有人可以帮忙吗?
[UPDATE1]
现在我创建一个用于发布的频道和另一个用于接收的频道.现在正在运作.那么如何使用单一渠道进行接收和发布(使用交易)?我之前使用过单个频道进行接收和发布,但没有交易.
[UPDATE2]
我在事务中移动了第6步,它现在正在运行.
dbConnection.setAutoCommit(false);
channel.txSelect();
stmt = dbConnection.prepareStatement(query);
/* set paramteres */
stmt.executeUpdate();
channel.basicPublish(/* exchange name */, "KEY", MessageProperties.PERSISTENT_BASIC, /* result */ result);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
dbConnection.commit();
channel.txCommit();
dbConnection.setAutoCommit(true);
Run Code Online (Sandbox Code Playgroud)
我有点困惑.我只是希望发布部分在内部事务中.