amqplib - 尽管调用了channel.ack(msg),但仍然没有消息

Jaz*_*lli 1 amqp rabbitmq node.js

我正在编写一个模块,作为amqplib的包装器.我们的动机是,我们已经建立了一个现有的,定义良好的交换/队列/绑定,我只想公开使用消费方法以允许消费传入数据.

为此,我的模块采用了表单的回调参数callback(channel, msg).在模块中,在设置了交换和队列以及绑定之后,我有以下内容

module.exports = function (options, callback) {

  /* connection, exchange and queue set up here */

  // consume messages from primary queue
  ok = ok.then(function() {
    var q = opts.pq;
    console.log('Subscribing to', q);
    return ch.consume(q, function (message) {
      callback(ch, message);
    });
  });

  return ok;

};
Run Code Online (Sandbox Code Playgroud)

在回调中,我正在处理消息,并在成功时调用channel.ack(msg).

一切都运行正常,因为没有错误,但是RabbitMQ管理控制台将所有已处理的消息显示为Unacked.如果我然后杀了我的应用程序,Unacked消息将返回队列(谢天谢地).

为什么我的消息没有被删除?我在ack'ing做错了吗?我应该allUpTo在调用中将参数提供为true ack吗?

环境详情

node -v
v0.8.26

npm ls
...
amqplib@0.1.1
Run Code Online (Sandbox Code Playgroud)

Jaz*_*lli 5

为了完整,用我发现的东西回答我自己的问题.这实际上取决于node-amqpamqp.node之间默认设置的差异以及我的消费者中的一个稍微有缺陷的实现.

订阅队列时,node-amqp默认使用a prefetch=1,这意味着任何时候只有1条消息在传输中.另一个将在上一次被确认之前交付.但是,默认为amqp.node prefetch=0,意味着所有消息都尽快发送给消费者,并且当消费者完成每个消息时,可以在将来的某个时间点单独确认消息.

这是我在管理控制台中看到的,它引起了警报并导致了这个问题.在这个github问题中可以看到对此的讨论.

ASIDE - 鉴于上述信息,我让我的消费者继续运行,让它在准备就绪时发出消息.这为我的实现提出了一个单独的问题.鉴于我们正在传递消息的方式(即一次性传递),消息处理程序正在通过请求新连接来轰炸数据库.结果,数据库服务器过载,最终消费者死亡.为了解决这个问题,我刚刚切换到prefetch=1,如前所述,意味着上面的消费承诺方法现在如下

module.exports = function (options, callback) {

  /* connection, exchange and queue set up here */

  // consume messages from primary queue
  ok = ok.then(function() {
    var q = opts.pq;
    console.log('Subscribing to', q);

    ch.prefetch(1);   // <-- only get 1 message at a time

    return ch.consume(q, function (message) {
      callback(ch, message);
    }, { noAck: false });
  });


  return ok;

};
Run Code Online (Sandbox Code Playgroud)

值得注意的是,如果有人面临类似的情况.