在工作线程中处理异步消息

use*_*201 3 web-worker node.js async-await

我们正在使用 NodeJS 实验工作器来完成一些 CPU 密集型任务。这些任务是通过消息传递的消息启动的parentPort。在线程运行期间,它们需要将数据持久化到数据库,这是由 Promise 支持的异步操作。

我们看到的是,parentPort在执行异步操作时,消息不断发送到处理函数。

我们正在执行的代码示例:

const { parentPort, Worker, isMainThread } = require('worker_threads');

if (isMainThread) {
  const worker = new Worker(__filename);

  const i = [1, 2, 3, 4, 5, 6, 7, 8, 9];
  for (const x of i) {
    worker.postMessage({ idx: x });
  }
} else {
  parentPort.on('message', async (value) => {
    await testAsync(value);
  });
}

async function testAsync(value) {
  return new Promise((resolve) => {
    console.log(`Starting wait for ${value.idx}`);
    setTimeout(() => {
      console.log(`Complete resolve for ${value.idx}`);
      resolve();

      if(value.idx == 9) {
        setTimeout(() => process.exit(0), 2000);
      }
    }, 500);
  });
}
Run Code Online (Sandbox Code Playgroud)

Starting wait for ...在上面的示例中,我们在任何消息出现之前看到打印Complete resolve ...。我们期望async-await事件处理程序在处理新事件之前等待已解决的承诺。在实际示例中,数据库连接可能会失败,从而引发异常,因此我们希望在接受新消息之前确保当前消息已被完全处理。

我们在这里做错了什么吗?

如果没有,是否有办法实现按顺序处理事件的预期目标?

Mar*_*nde 5

看来您想将消息排入队列,并且一次只处理一件事。

parentPort.on('message', () => {}是一个事件监听器,当事件被触发时,不会等到回调内之前的异步操作完成。

所以,如果触发'message'一千次,testAsync就会执行一千次,无需等待。

您需要在worker中实现一个队列,并限制并发数。NPM 中有多个 Promise 队列包。

p-queue我将在这个例子中使用。

const PQueue = require('p-queue');

const { parentPort, Worker, isMainThread } = require('worker_threads');

if (isMainThread) {
  const worker = new Worker(__filename);

  const i = [1, 2, 3, 4, 5, 6, 7, 8, 9];
  for (const x of i) {
    worker.postMessage({ idx: x });
  }
} else {

    const queue = new PQueue({ concurrency: 1 }); // set concurrency

    parentPort.on('message', value => {
      queue.add(() => testAsync(value));
    });
}

async function testAsync(value) {
  return new Promise((resolve) => {
    console.log(`Starting wait for ${value.idx}`);
    setTimeout(() => {
      console.log(`Complete resolve for ${value.idx}`);
      resolve();

      if(value.idx == 9) {
        setTimeout(() => process.exit(0), 2000);
      }
    }, 500);
  });
}
Run Code Online (Sandbox Code Playgroud)

现在输出将是:

starting wait for 1
complete resolve for 1

starting wait for 2
complete resolve for 2

starting wait for N
complete resolve for N
Run Code Online (Sandbox Code Playgroud)