Jul*_*ian 3 stream redis node.js
我正在使用fast-csv使用a来迭代CSV文件stream.对于CSV文件的每一行,我想在redis中创建一个作业,我使用kue.解析一行是同步函数.整件事看起来像这样:
var csvStream = fastCsv(_config.csvOptions)
.on('data', function(data) {
var stream = this;
stream.pause();
var payload = parseRow(data, _config);
console.log(payload); // the payload is always printed to the console
var job = kue.create('csv-row', {
payload: payload
})
.save(function(err) {
if (!err) console.log('Enqueued at ' + job.id);
else console.log('Redis error ' + JSON.stringify(err));
stream.resume();
});
})
.on('end', function() {
callback(); // this ends my parsing
});
Run Code Online (Sandbox Code Playgroud)
console.log(payload);我的CSV文件的每一行都有简单的显示,但是没有创建作业.即,回调中的输出save都没有被打印,并且作业不在我的redis中.
我假设,因为它是CSV文件的最后一行,流已经发出end,所以kue.create()在进程终止之前无法执行最后一行?
有没有办法停止流,end直到kue完成?
小智 7
您可以使用异步库解决此问题.您可以将以下模式用于任何流.
var AsyncLib = require('async');
var worker = function (payload, cb) {
//do something with payload and call callback
return cb();
};
var concurrency = 5;
var streamQueue = AsyncLib.queue(worker, concurrency);
var stream = //some readable stream;
stream.on('data', function(data) {
//no need to pause and resume
var payload = '//some payload';
streamQueue.push(payload);
})
.on('end', function() {
//register drain event on end and callback
streamQueue.drain = function () {
callback();
};
});
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2413 次 |
| 最近记录: |