我正在使用amqp.node库将rabbitmq集成到我的系统中。
但是在使用者中,我一次只想处理一条消息,然后确认该消息,然后使用队列中的下一条消息。
当前代码是:
// Consumer
open.then(function(conn) {
var ok = conn.createChannel();
ok = ok.then(function(ch) {
ch.assertQueue(q);
ch.consume(q, function(msg) {
if (msg !== null) {
othermodule.processMessage(msg, function(error, response){
console.log(msg.content.toString());
ch.ack(msg);
});
}
});
});
return ok;
}).then(null, console.warn);
Run Code Online (Sandbox Code Playgroud)
ch.consume将一次处理通道中的所有消息,并且在此处调用模块的功能othermodule不会在同一时间线执行。
我要等待othermodule函数完成,然后再使用队列中的下一条消息。
Hoa*_*inh 12
此时此刻(2018 年),我认为 RabbitMQ 团队可以选择这样做:
https://www.rabbitmq.com/tutorials/tutorial-two-javascript.html
ch.prefetch(1);
Run Code Online (Sandbox Code Playgroud)
为了解决这个问题,我们可以使用值为 1 的 prefetch 方法。这告诉 RabbitMQ 不要一次给一个工作线程多于一条消息。或者,换句话说,在工作人员处理并确认前一条消息之前,不要向工作人员发送新消息。相反,它会将其分派给下一个不忙的工作人员。
创建模型时,您需要在其上设置 QOS。下面是我们在 C# 中的做法:
\n\n var _model = rabbitConnection.CreateModel();\n // Configure the Quality of service for the model. Below is how what each setting means.\n // BasicQos(0="Dont send me a new message untill I\xe2\x80\x99ve finshed", _fetchSize = "Send me N messages at a time", false ="Apply to this Model only")\n _model.BasicQos(0, _fetchSize, false);\n var consumerTag = _model.BasicConsume(rabbitQueue.QueueName, false, _consumerName, queueingConsumer);\nRun Code Online (Sandbox Code Playgroud)\n