暂停Node.js中的可读流

Nik*_*des 5 stream node.js

我正在使用csv-to-json(一个简洁的库来处理CSV文件)。

我有一个用例,其中我需要处理大型(> 200万行)CSV并将其插入数据库。

为了做到这一点而不会遇到内存问题,我打算将CSV作为流进行处理,每10000行暂停该流,将这些行插入DB中,然后恢复该流。

出于某种原因,我似乎无法pause接受。

以下面的代码为例:

const rs = fs.createReadStream("./foo.csv");
rs.pause();

let count = 0;

csv()
.fromStream(rs)
.on("json", (json) => {
  count++;
  console.log(count);
})
.on("done", () => {
  cb(null, count);
})
.on("error", (err) => {
  cb(err);
})
Run Code Online (Sandbox Code Playgroud)

count 被记录了200次(这就是我的CSV中有多少行)-我期望它不记录任何内容,因为在将流传递给之前将其暂停 fromStream()

Nik*_*des 5

这是库的创建者建议的解决方案,在此问题中进行了跟踪:

var tmpArr=[];
rs.pipe(csv({},{objectMode:true})).pipe(new Writable({
  write: function(json, encoding,callback){
    tmpArr.push(json);
    if (tmpArr.length===10000){
      myDb.save(tmpArr,function(){
        tmpArr=[];
        callback();
      })
    }else{
      callback();
    }
  } ,
  objectMode:true
}))
.on('finish',function(){
  if (tmpArr.length>0){
    myDb.save(tmpArr,function(){
      tmpArr=[];
    })
  }
})
Run Code Online (Sandbox Code Playgroud)

我实际上已经设法通过像这样取消管道来模拟暂停,但这并不理想:

let count = 0;
var csvParser=csv()
.fromStream(rs)
.on("json", (json) => {
  rows.push(json);
  if (rows.length % 1000 === 0) {
    rs.unpipe();
    // clear `rows` right after `unpipe`
    const entries = rows;
    rows = [];
    this._insertEntries(db, entries, ()=> {
      rs.pipe(csvParser);
    });
  }
})
Run Code Online (Sandbox Code Playgroud)