我正在使用csv-to-json(一个简洁的库来处理CSV文件)。
我有一个用例,其中我需要处理大型(> 200万行)CSV并将其插入数据库。
为了做到这一点而不会遇到内存问题,我打算将CSV作为流进行处理,每10000行暂停该流,将这些行插入DB中,然后恢复该流。
出于某种原因,我似乎无法pause
接受。
以下面的代码为例:
const rs = fs.createReadStream("./foo.csv");
rs.pause();
let count = 0;
csv()
.fromStream(rs)
.on("json", (json) => {
count++;
console.log(count);
})
.on("done", () => {
cb(null, count);
})
.on("error", (err) => {
cb(err);
})
Run Code Online (Sandbox Code Playgroud)
count
被记录了200次(这就是我的CSV中有多少行)-我期望它不记录任何内容,因为在将流传递给之前将其暂停 fromStream()
这是库的创建者建议的解决方案,在此问题中进行了跟踪:
var tmpArr=[];
rs.pipe(csv({},{objectMode:true})).pipe(new Writable({
write: function(json, encoding,callback){
tmpArr.push(json);
if (tmpArr.length===10000){
myDb.save(tmpArr,function(){
tmpArr=[];
callback();
})
}else{
callback();
}
} ,
objectMode:true
}))
.on('finish',function(){
if (tmpArr.length>0){
myDb.save(tmpArr,function(){
tmpArr=[];
})
}
})
Run Code Online (Sandbox Code Playgroud)
我实际上已经设法通过像这样取消管道来模拟暂停,但这并不理想:
let count = 0;
var csvParser=csv()
.fromStream(rs)
.on("json", (json) => {
rows.push(json);
if (rows.length % 1000 === 0) {
rs.unpipe();
// clear `rows` right after `unpipe`
const entries = rows;
rows = [];
this._insertEntries(db, entries, ()=> {
rs.pipe(csvParser);
});
}
})
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
5032 次 |
最近记录: |