Jaz*_*lli 1 amqp rabbitmq node.js
我正在编写一个模块,作为amqplib的包装器.我们的动机是,我们已经建立了一个现有的,定义良好的交换/队列/绑定,我只想公开使用消费方法以允许消费传入数据.
为此,我的模块采用了表单的回调参数callback(channel, msg).在模块中,在设置了交换和队列以及绑定之后,我有以下内容
module.exports = function (options, callback) {
/* connection, exchange and queue set up here */
// consume messages from primary queue
ok = ok.then(function() {
var q = opts.pq;
console.log('Subscribing to', q);
return ch.consume(q, function (message) {
callback(ch, message);
});
});
return ok;
};
Run Code Online (Sandbox Code Playgroud)
在回调中,我正在处理消息,并在成功时调用channel.ack(msg).
一切都运行正常,因为没有错误,但是RabbitMQ管理控制台将所有已处理的消息显示为Unacked.如果我然后杀了我的应用程序,Unacked消息将返回队列(谢天谢地).
为什么我的消息没有被删除?我在ack'ing做错了吗?我应该allUpTo在调用中将参数提供为true ack吗?
环境详情
node -v
v0.8.26
npm ls
...
amqplib@0.1.1
Run Code Online (Sandbox Code Playgroud)
为了完整,用我发现的东西回答我自己的问题.这实际上取决于node-amqp和amqp.node之间默认设置的差异以及我的消费者中的一个稍微有缺陷的实现.
订阅队列时,node-amqp默认使用a prefetch=1,这意味着任何时候只有1条消息在传输中.另一个将在上一次被确认之前交付.但是,默认为amqp.node prefetch=0,意味着所有消息都尽快发送给消费者,并且当消费者完成每个消息时,可以在将来的某个时间点单独确认消息.
这是我在管理控制台中看到的,它引起了警报并导致了这个问题.在这个github问题中可以看到对此的讨论.
ASIDE - 鉴于上述信息,我让我的消费者继续运行,让它在准备就绪时发出消息.这为我的实现提出了一个单独的问题.鉴于我们正在传递消息的方式(即一次性传递),消息处理程序正在通过请求新连接来轰炸数据库.结果,数据库服务器过载,最终消费者死亡.为了解决这个问题,我刚刚切换到prefetch=1,如前所述,意味着上面的消费承诺方法现在如下
module.exports = function (options, callback) {
/* connection, exchange and queue set up here */
// consume messages from primary queue
ok = ok.then(function() {
var q = opts.pq;
console.log('Subscribing to', q);
ch.prefetch(1); // <-- only get 1 message at a time
return ch.consume(q, function (message) {
callback(ch, message);
}, { noAck: false });
});
return ok;
};
Run Code Online (Sandbox Code Playgroud)
值得注意的是,如果有人面临类似的情况.
| 归档时间: |
|
| 查看次数: |
2462 次 |
| 最近记录: |