NodeJS,promises,streams - 处理大型CSV文件

alp*_*ogg 11 node.js promise bluebird pg-promise

我需要构建一个处理大型CSV文件的函数,以便在bluebird.map()调用中使用.考虑到文件的潜在大小,我想使用流媒体.

此函数应接受流(CSV文件)和函数(处理流中的块)并在读取文件结束(已解决)或错误(拒绝)时返回承诺.

所以,我从:

'use strict';

var _ = require('lodash');
var promise = require('bluebird');
var csv = require('csv');
var stream = require('stream');

var pgp = require('pg-promise')({promiseLib: promise});

api.parsers.processCsvStream = function(passedStream, processor) {

  var parser = csv.parse(passedStream, {trim: true});
  passedStream.pipe(parser);

  // use readable or data event?
  parser.on('readable', function() {
    // call processor, which may be async
    // how do I throttle the amount of promises generated
  });

  var db = pgp(api.config.mailroom.fileMakerDbConfig);

  return new Promise(function(resolve, reject) {
    parser.on('end', resolve);
    parser.on('error', reject);
  });

}
Run Code Online (Sandbox Code Playgroud)

现在,我有两个相互关联的问题:

  1. 我需要限制正在处理的实际数据量,以免产生内存压力.
  2. 作为processorparam 传递的函数通常是异步的,例如通过基于promise的库(现在:)将文件的内容保存到db pg-promise.因此,它将在记忆中创造一个承诺并反复继续前进.

pg-promise库具有管理它的功能,如page(),但我无法将如何将流事件处理程序与这些promise方法混合在一起.现在,我readable在每个部分之后的处理程序中返回一个promise read(),这意味着我创建了大量的承诺数据库操作并最终因为我达到进程内存限制而出错.

有没有人有一个这样的工作例子,我可以用作跳跃点?

更新:可能有不止一种方法给猫皮肤,但这有效:

'use strict';

var _ = require('lodash');
var promise = require('bluebird');
var csv = require('csv');
var stream = require('stream');

var pgp = require('pg-promise')({promiseLib: promise});

api.parsers.processCsvStream = function(passedStream, processor) {

  // some checks trimmed out for example

  var db = pgp(api.config.mailroom.fileMakerDbConfig);
  var parser = csv.parse(passedStream, {trim: true});
  passedStream.pipe(parser);

  var readDataFromStream = function(index, data, delay) {
    var records = [];
    var record;
    do {
      record = parser.read();
      if(record != null)
        records.push(record);
    } while(record != null && (records.length < api.config.mailroom.fileParserConcurrency))
    parser.pause();

    if(records.length)
      return records;
  };

  var processData = function(index, data, delay) {
    console.log('processData(' + index + ') > data: ', data);
    parser.resume();
  };

  parser.on('readable', function() {
    db.task(function(tsk) {
      this.page(readDataFromStream, processData);
    });
  });

  return new Promise(function(resolve, reject) {
    parser.on('end', resolve);
    parser.on('error', reject);
  });
}
Run Code Online (Sandbox Code Playgroud)

有人发现这种方法存在潜在问题吗?

Gor*_*sev 8

您可能希望查看promise-streams

var ps = require('promise-streams');
passedStream
  .pipe(csv.parse({trim: true}))
  .pipe(ps.map({concurrent: 4}, row => processRowDataWhichMightBeAsyncAndReturnPromise(row)))
  .wait().then(_ => {
    console.log("All done!");
  });
Run Code Online (Sandbox Code Playgroud)

适用于背压和一切.


vit*_*y-t 5

在下面找到一个完整的应用程序,该应用程序可以正确执行所需的相同类型的任务:它以流的形式读取文件,将其解析为CSV并将每一行插入数据库。

const fs = require('fs');
const promise = require('bluebird');
const csv = require('csv-parse');
const pgp = require('pg-promise')({promiseLib: promise});

const cn = "postgres://postgres:password@localhost:5432/test_db";
const rs = fs.createReadStream('primes.csv');

const db = pgp(cn);

function receiver(_, data) {
    function source(index) {
        if (index < data.length) {
            // here we insert just the first column value that contains a prime number;
            return this.none('insert into primes values($1)', data[index][0]);
        }
    }

    return this.sequence(source);
}

db.task(t => {
    return pgp.spex.stream.read.call(t, rs.pipe(csv()), receiver);
})
    .then(data => {
        console.log('DATA:', data);
    }
    .catch(error => {
        console.log('ERROR:', error);
    });
Run Code Online (Sandbox Code Playgroud)

请注意,我唯一改变的是:使用library csv-parse代替csv,作为更好的选择。

增加了使用方法stream.readSPEX库,它正确地供应可读与承诺使用流。