异步队列,文件流结束时如何知道两者完成

use*_*720 10 javascript callback stream node.js async.js

将async.queue与文件流一起使用时,我遇到了一个小问题

  1. 我有一个场景,我的文件流将完成
  2. 我将fileRead设置为true
  3. 但是队列将是空的并且已经被称为排水
  4. 这导致我的"完成"永远不会被召唤

在我的文件流"结束"并且队列为空之后说"结束队列"的正确方法是什么?

var fs = require('fs')
    , util = require('util')
    , stream = require('stream')
    , es = require('event-stream');

var async = require('async');

var fileRead = false;
var lineNr = 0;

var q = async.queue(function(task, callback) {
    task(function(err, lineData){
        responseLines.push(lineData);
        callback();
      });
  }, 5);

var q.drain = function() {
  if(fileRead){
    done(null, responseLines);
  }
}

var s = fs.createReadStream('very-large-file.csv')
    .pipe(es.split())
    .pipe(es.mapSync(function(line){
        s.pause();
        q.push(async.apply(insertIntoDb, line))
        s.resume();
    })
    .on('error', function(err){
       done(err);
    })
    .on('end', function(){
      fileRead = true;
    })
);
Run Code Online (Sandbox Code Playgroud)

或者是否有更好的使用异步,这将允许我这样做?异步处理逐行,如果其中一行有错误,则能够提前退出

lee*_*sio 1

首先,我不确定你的代码片段有多少是伪代码,但var q.drain = ...不是有效的 JavaScript,应该出错。它应该就像q.drain =您在现有对象上定义属性而不声明新变量一样。这可能就是为什么如果不是伪代码,您的排出函数不会触发的原因。

有几种方法可以实现我认为您想要做的事情。一种方法是检查最终处理程序中队列的长度,如果仍有项目需要处理,则设置排出函数。

.on('end', function(){
  if(!q.length){
    callDone();
  }
  else {
    q.drain = callDone;
  }
});

function callDone(){
  done(null, responseLines);
}
Run Code Online (Sandbox Code Playgroud)

这实际上是在说“如果队列已被处理,则调用完成,如果没有,则在完成时调用完成!” 我确信有很多方法可以整理您的代码,但希望这能为您的特定问题提供解决方案。