在RabbitMQ中手动确认消息

lam*_*dar 3 rabbitmq spring-amqp spring-rabbitmq

以前我正在读取队列中存在的所有消息,但现在我必须根据用户选择(计数)返回特定数量的消息.

我尝试相应地更改for循环,但由于自动确认,它读取所有消息.所以我尝试在配置文件中将其更改为手动.

在我的程序中如何在阅读msg后手动确认消息(目前我正在使用AmqpTemplate接收并且我没有频道参考)?

    Properties properties = admin.getQueueProperties("queue_name");
    if(null != properties)
    {
        Integer messageCount = Integer.parseInt(properties.get("QUEUE_MESSAGE_COUNT").toString());          
        while(messageCount > 0)
        {
            Message msg = amqpTemplate.receive(queue_name);
            String value = new String(msg.getBody());

            valueList.add(value);
            messageCount--;
        }
}
Run Code Online (Sandbox Code Playgroud)

任何帮助都非常值得感谢,提前谢谢.

Gar*_*ell 10

您不能手动使用该receive()方法 - 使用SimpleMessageListenerContainer带有MANUAL acks和a的事件驱动的消费者ChannelAwareMessageListener.或者,使用模板的execute()方法,您可以访问Channel- 然后您将使用较低级别的RabbitMQ API,而不是Message抽象.

编辑:

你需要学习底层的RabbitMQ Java API来使用execute,但是这样的东西会起作用......

    final int messageCount = 3;
    boolean result = template.execute(new ChannelCallback<Boolean>() {

        @Override
        public Boolean doInRabbit(final Channel channel) throws Exception {
            int n = messageCount;
            channel.basicQos(messageCount); // prefetch
            long deliveryTag = 0;
            while (n > 0) {
                GetResponse result = channel.basicGet("si.test.queue", false);
                if (result != null) {
                    System.out.println(new String(result.getBody()));
                    deliveryTag = result.getEnvelope().getDeliveryTag();
                    n--;
                }
                else {
                    Thread.sleep(1000);
                }
            }
            if (deliveryTag > 0) {
                channel.basicAck(deliveryTag, true);
            }
            return true;
        }
    });
Run Code Online (Sandbox Code Playgroud)