使用 Node.js 使用 RabbitMQ 消息时可以等待进程完成吗?

Ken*_*all 7 javascript rabbitmq node.js es6-promise

我对 Node.js 和 ES6 还很陌生,这让我有点困惑。我试图让进程保持运行状态,消耗来自 RabbitMQ 队列的消息。它需要能够处理该消息(大约需要 30-60 秒),然后才能获取下一条消息。目前,我拥有的代码,它会获取所有可以获取的消息,然后尝试分叉进程。当队列中有 3-5 条消息时,这很好,但对于 20、50 或 100 条消息,这会导致服务器内存不足。

我尝试使.consume()回调函数异步并添加await到消息处理函数中。我尝试await new Promise.consume()回调中包装一个processMessage。我尝试添加await到调用channel.consume. 没有什么改变行为。

#!/usr/bin/env node

const amqp = require('amqplib');

const consumeFromQueue = async (queue, isNoAck = false, durable = false, prefetch = null) => {
    const conn_str = "amqp://" + process.env.RABBITMQ_USERNAME + ":" + process.env.RABBITMQ_PASSWORD + "@" + process.env.RABBITMQ_HOST + "/development?heartbeat=60"
    const cluster = await amqp.connect(conn_str);
    const channel = await cluster.createChannel();
    await channel.assertQueue(queue,  { durable: durable, autoDelete: true });
    if (prefetch) {
        channel.prefetch(prefetch);
    }
    console.log(` [x] Waiting for messages in ${queue}. To exit press CTRL+C`)

    try {
        channel.consume(queue, message => {
            if (message !== null) {
                console.log(' [x] Received', message.content.toString());
                processMessage(message.content.toString());
                channel.ack(message);
                return null;
            } else {
                console.log(error, 'Queue is empty!')
                channel.reject(message);
            }
        }, {noAck: isNoAck});
    } catch (error) {
        console.log(error, 'Failed to consume messages from Queue!')
        cluster.close(); 
    }
}

exports.consumeFromQueue = consumeFromQueue;
Run Code Online (Sandbox Code Playgroud)

作为旁注,如果我创建一个字符串数组并循环遍历字符串,当我将等待添加到该processMessage行时,它会在处理下一个字符串之前等待执行进程(30-60 秒)。

(async () => {
    for (let i=0; i<urls.length; i++) {
        await processMessage(urls[i]);
    }
})();
Run Code Online (Sandbox Code Playgroud)

所以我基本上需要类似这样的功能,但是要监听 RabbitMQ 中的队列。

shk*_*per 9

如果您想限制消费者在任何给定时间处理的消息数量,请使用channel.prefetch()

给出的计数是通过通道发送的可等待确认的最大消息数;一旦有 count 条消息未完成,服务器将不会在此通道上发送更多消息,直到一条或多条消息得到确认。

也就是说,如果您只想在继续处理下一条消息之前一次处理一条消息,请设置channel.prefetch(1)