协调node.js中的并行执行

Thi*_*ilo 79 javascript parallel-processing concurrency fork-join node.js

node.js的事件驱动编程模型使得协调程序流有点棘手.

简单的顺序执行变成了嵌套的回调,这很容易(虽然写下来有点复杂).

但并行执行怎么样?假设您有三个可以并行运行的任务A,B,C,当它们完成时,您希望将结果发送到任务D.

使用fork/join模型,这将是

  • 叉子A.
  • 叉子B.
  • 叉子C.
  • 加入A,B,C,运行D.

我如何在node.js中编写它?有没有最佳做法或烹饪书?我是否每次都必须手动滚动解决方案,或者是否有一些带帮助程序的库?

sle*_*man 126

在node.js中没有什么是真正的并行,因为它是单线程的.但是,可以按照您无法事先确定的顺序安排和运行多个事件.而数据库访问等实际上是"并行"的,因为数据库查询本身在不同的线程中运行,但在完成时会重新集成到事件流中.

那么,如何在多个事件处理程序上安排回调?嗯,这是浏览器端javascript动画中使用的一种常用技术:使用变量来跟踪完成情况.

这听起来像是一个黑客,它听起来可能是混乱的,留下一堆全局变量围绕着跟踪,而用较少的语言.但在javascript中我们可以使用闭包:

function fork (async_calls, shared_callback) {
  var counter = async_calls.length;
  var callback = function () {
    counter --;
    if (counter == 0) {
      shared_callback()
    }
  }

  for (var i=0;i<async_calls.length;i++) {
    async_calls[i](callback);
  }
}

// usage:
fork([A,B,C],D);
Run Code Online (Sandbox Code Playgroud)

在上面的例子中,我们通过假设async和callback函数不需要参数来保持代码简单.您当然可以修改代码以将参数传递给异步函数,并让回调函数累积结果并将其传递给shared_callback函数.


补充答案:

实际上,即使是这样,该fork()函数也可以使用闭包将参数传递给异步函数:

fork([
  function(callback){ A(1,2,callback) },
  function(callback){ B(1,callback) },
  function(callback){ C(1,2,callback) }
],D);
Run Code Online (Sandbox Code Playgroud)

剩下要做的唯一事情是累积A,B,C的结果并将它们传递给D.


更多补充答案:

我无法抗拒.在早餐期间一直想着这个.这fork()是累积结果的实现(通常作为参数传递给回调函数):

function fork (async_calls, shared_callback) {
  var counter = async_calls.length;
  var all_results = [];
  function makeCallback (index) {
    return function () {
      counter --;
      var results = [];
      // we use the arguments object here because some callbacks 
      // in Node pass in multiple arguments as result.
      for (var i=0;i<arguments.length;i++) {
        results.push(arguments[i]);
      }
      all_results[index] = results;
      if (counter == 0) {
        shared_callback(all_results);
      }
    }
  }

  for (var i=0;i<async_calls.length;i++) {
    async_calls[i](makeCallback(i));
  }
}
Run Code Online (Sandbox Code Playgroud)

这很容易.这fork()非常通用,可用于同步多个非同类事件.

Node.js中的示例用法:

// Read 3 files in parallel and process them together:

function A (c){ fs.readFile('file1',c) };
function B (c){ fs.readFile('file2',c) };
function C (c){ fs.readFile('file3',c) };
function D (result) {
  file1data = result[0][1];
  file2data = result[1][1];
  file3data = result[2][1];

  // process the files together here
}

fork([A,B,C],D);
Run Code Online (Sandbox Code Playgroud)

更新

此代码是在存在像async.js或各种基于promise的库之类的库之前编写的.我想相信async.js的灵感来自于此,但我没有任何证据.无论如何..如果你今天想要这样做,请看看async.js或promises.只需考虑上面的答案,就async.parallel之类的工作方式做了很好的解释/说明.

为了完整起见,以下是您如何做到这一点async.parallel:

var async = require('async');

async.parallel([A,B,C],D);
Run Code Online (Sandbox Code Playgroud)

请注意,async.parallel它与fork我们上面实现的功能完全相同.主要区别在于它将第一个参数传递给错误,D并根据node.js约定将回调传递为第二个参数.

使用promises,我们将其编写如下:

// Assuming A, B & C return a promise instead of accepting a callback

Promise.all([A,B,C]).then(D);
Run Code Online (Sandbox Code Playgroud)

  • "在node.js中没有什么是真正的并行,因为它是单线程的." 不对.不使用CPU的所有内容(例如等待网络I/O)并行运行. (12认同)
  • @Thilo:通常我们将不使用CPU的代码称为不运行.如果你没有跑步,你不能并行"跑". (6认同)
  • @MooGoo:这意味着对于事件,因为我们知道它们肯定不能并行运行,我们不必担心信号量和互斥量,而使用线程我们必须锁定共享资源. (4认同)
  • 大多数情况下都是如此.在Node中等待IO不会阻止其他代码运行,但是当代码运行时,它一次只能运行一个代码.Node中唯一真正的并行执行来自产生子进程,但几乎任何环境都可以说. (3认同)
  • 我是否正确地说这些并不是并行执行的函数,但它们(最好)以不确定的顺序执行,代码没有进展,直到每个'async_func'返回? (2认同)

Wes*_*ble 10

我相信现在"异步"模块提供了这种并行功能,并且与上面的fork函数大致相同.

  • 这是不正确的,async只能帮助您在单个进程中组织代码流. (2认同)
  • async.parallel确实与上面的`fork`函数大致相同 (2认同)

Ran*_*ndy 5

期货模块有一个名为子模块加入,我喜欢用:

将异步调用连接在一起类似于pthread_join线程的工作方式.

自述文件显示了使用自由式或使用Promise模式使用未来子模块的一些很好的例子.来自文档的示例:

var Join = require('join')
  , join = Join()
  , callbackA = join.add()
  , callbackB = join.add()
  , callbackC = join.add();

function abcComplete(aArgs, bArgs, cArgs) {
  console.log(aArgs[1] + bArgs[1] + cArgs[1]);
}

setTimeout(function () {
  callbackA(null, 'Hello');
}, 300);

setTimeout(function () {
  callbackB(null, 'World');
}, 500);

setTimeout(function () {
  callbackC(null, '!');
}, 400);

// this must be called after all 
join.when(abcComplete);
Run Code Online (Sandbox Code Playgroud)