流处理完成后如何确保异步代码执行?

mod*_*tos 5 javascript asynchronous node.js promise node.js-stream

我有我通过侦听处理流data,errorend事件,我调用一个函数来处理每一个data第一流中的事件.当然,处理数据的函数会调用其他回调,使其异步.那么当处理流中的数据时,如何开始执行更多代码?end在流中侦听事件并不意味着异步data处理功能已完成.

当我执行下一个语句时,如何确保完成流数据处理功能?

这是一个例子:

function updateAccountStream (accountStream, callThisOnlyAfterAllAccountsAreMigrated) {
  var self = this;
  var promises = [];
  accountStream
    .on('data', function (account) {
      migrateAccount.bind(self)(account, finishMigration);
    })
    .on('error', function (err) {
      return console.log(err);
    })
    .on('end', function () {
      console.log("Finished updating account stream (but finishMigration is still running!!!)");
      callThisOnlyAfterAllAccountsAreMigrated() // finishMigration is still running!
    });
}

var migrateAccount = function (oldAccount, callback) {
  executeSomeAction(oldAccount, function(err, newAccount) {
    if (err) return console.log("error received:", err);
    return callback(newAccount);
  });
}

var finishMigration = function (newAccount) {
  // some code that is executed asynchronously...
}
Run Code Online (Sandbox Code Playgroud)

如何确保callThisOnlyAfterAllAccountsAreMigrated在处理流后调用?

这可以用承诺来完成吗?它可以通过流完成吗?我正在使用Nodejs,因此引用其他npm模块可能会有所帮助.

Bra*_*rad 2

正如您所说,监听end流上的事件本身是没有用的。流不知道也不关心您正在处理处理程序中的数据data,因此您需要编写一些代码来跟踪您自己的 migrateAccount 状态。

如果是我,我会重写这整个部分。如果您在流readable中使用该事件.read(),则可以一次阅读任意数量的项目。如果是这样的话,没问题。如果是30就好了 这样做的原因是这样您就不会超出正在处理来自流的数据的任何内容。按现在的情况,如果 accountStream 很快,您的应用程序无疑会在某个时候崩溃。

当您从流中读取一个项目并开始工作时,获取您返回的承诺(使用 Bluebird 或类似的)并将其放入数组中。当承诺得到解决后,将其从数组中删除。当流结束时,附加一个.done()处理程序.all()(基本上是从数组中的每个承诺中做出一个大承诺)。

您还可以使用一个简单的计数器来记录正在进行的作业。