Node.js Streams on(end) 在异步 on(readable) 完成之前完成

Dav*_*vid 5 asynchronous stream mongoose mongodb node.js

我正在使用 Node.js 请求库和 node-feedparser 来捕获提要,并将其发布到使用 Mongoose 的 MongoDB 数据库中。

我将帖子信息存储到帖子集合中,将提要信息存储到提要集合中,但我需要将 post._id 存储在提要集合中名为 feeds._post 的数组中。

我遇到的问题是使用流接口,在所有 feedparser.on('readable') 对数据库的异步调用完成之前调用 feedparser.on('end'),因此我最终得到假设 Post 集合中有 15 个帖子,而 Feed._post 数组中只有 11 个 post._id。

我知道如果这只是普通的 JavaScript,我可以使用 async 来确保 .on('end') 等待所有 .on('readable') 完成,但是我该如何处理流?

提前致谢。

db.Feed.findById(feedid, function(error, feed) {

// request.on('response') -> this.pipe(feedparser)

  feedparser.on('readable', function() {
    var post;
    while (null !== (post = this.read())) {
      db.Post.create({ /* post details */ }, function(err, post) {
        feed._post.push(post);
      });
    }
  });

  feedparser.on('end', function() {
    feed.save();
  });

});
Run Code Online (Sandbox Code Playgroud)

aem*_*bke 3

您需要跟踪计数器和布尔值。当“可读”事件首次触发时递增计数器,并在将其保存到数据库后递减计数器。布尔值以一种状态开始,并在“结束”事件触发时切换。例如:

 var processing = 0, done = false;

 var finished = function(){
     if(processing === 0 && done){
         feed.save();
         // ... other stuff
     }
 };

 feedparser.on("readable", function(){
     processing++;
     doStuff(something, function(){
         // something asynchronous         
         processing--;
         finished();
     });
 });

 feedparser.on("end", function(){
     done = true;
     finished();
 });
Run Code Online (Sandbox Code Playgroud)