use*_*720 10 javascript callback stream node.js async.js
将async.queue与文件流一起使用时,我遇到了一个小问题
在我的文件流"结束"并且队列为空之后说"结束队列"的正确方法是什么?
var fs = require('fs')
, util = require('util')
, stream = require('stream')
, es = require('event-stream');
var async = require('async');
var fileRead = false;
var lineNr = 0;
var q = async.queue(function(task, callback) {
task(function(err, lineData){
responseLines.push(lineData);
callback();
});
}, 5);
var q.drain = function() {
if(fileRead){
done(null, responseLines);
}
}
var s = fs.createReadStream('very-large-file.csv')
.pipe(es.split())
.pipe(es.mapSync(function(line){
s.pause();
q.push(async.apply(insertIntoDb, line))
s.resume();
})
.on('error', function(err){
done(err);
})
.on('end', function(){
fileRead = true;
})
);Run Code Online (Sandbox Code Playgroud)
或者是否有更好的使用异步,这将允许我这样做?异步处理逐行,如果其中一行有错误,则能够提前退出
首先,我不确定你的代码片段有多少是伪代码,但var q.drain = ...不是有效的 JavaScript,应该出错。它应该就像q.drain =您在现有对象上定义属性而不声明新变量一样。这可能就是为什么如果不是伪代码,您的排出函数不会触发的原因。
有几种方法可以实现我认为您想要做的事情。一种方法是检查最终处理程序中队列的长度,如果仍有项目需要处理,则设置排出函数。
.on('end', function(){
if(!q.length){
callDone();
}
else {
q.drain = callDone;
}
});
function callDone(){
done(null, responseLines);
}
Run Code Online (Sandbox Code Playgroud)
这实际上是在说“如果队列已被处理,则调用完成,如果没有,则在完成时调用完成!” 我确信有很多方法可以整理您的代码,但希望这能为您的特定问题提供解决方案。
| 归档时间: |
|
| 查看次数: |
590 次 |
| 最近记录: |