如何将JSON数组从NodeJS流式传输到postgres

Sri*_*Sri 6 postgresql json node.js express

我试图通过接收来自客户端的请求每个批量插入尝试10,000个记录(使用sequelize.js和bulkCreate())来插入数百万条记录(大约6个字段/列)

这显然是一个坏主意,所以我试着调查 node-pg-copy-streams

但是,我不想在客户端发起更改,其中json数组是这样发送的

# python
data = [
    {
     "column a":"a values",
     "column b":"b values",
    },
    ...
    # 10,000 items
    ...
]
request.post(data=json.dumps(data), url=url)
Run Code Online (Sandbox Code Playgroud)

在nodejs的服务器端,我将如何流式传输request.body以下骨架中的接收?

.post(function(req, res){

    // old sequelize code
    /* table5.bulkCreate(
        req.body, {raw:true}
    ).then(function(){
        return table5.findAll();
    }).then(function(result){
        res.json(result.count);
    });*/

    // new pg-copy-streams code
    pg.connect(function(err, client, done) {
    var stream = client.query(copyFrom('COPY my_table FROM STDIN'));
    // My question is here, how would I stream or pipe the request body ?
    // ?.on('error', done);
    // ?.pipe(stream).on('finish', done).on('error', done);
    });
});
Run Code Online (Sandbox Code Playgroud)

Sri*_*Sri 3

这是我解决问题的方法

首先是一个将我的 req.body 字典转换为 TSV 的函数(不是最初问题的一部分)

/**
 * Converts a dictionary and set of keys to a Tab Separated Value blob of text
 * @param {Dictionary object} dict
 * @param {Array of Keys} keys
 * @return {Concatenated Tab Separated Values} String
 */
function convertDictsToTSV(dicts, keys){
    // ...
}
Run Code Online (Sandbox Code Playgroud)

其次是我原来的 .post 函数的其余部分

.post(function(req, res){
    // ...
    /* requires 'stream' as 
     * var stream = require('stream');
     * var copyFrom = require('pg-copy-streams').from;
     */
    var read_stream_string = new stream.Readable();
    read_stream_string.read = function noop() {};
    var keys = [...]; // set of dictionary keys to extract from req.body 
    read_stream_string.push(convertDictsToTSV(req.body, keys));
    read_stream_string.push(null);
    pg.connect(connectionString, function(err, client, done) {
        // ...
        // error handling
        // ...
        var copy_string = 'Copy tablename (' + keys.join(',') + ') FROM STDIN'
        var pg_copy_stream = client.query( copyFrom( copy_string ) );
        read_stream_string.pipe(pg_copy_stream).on('finish', function(finished){
            // handle finished and done appropriately
        }).on('error', function(errored){
            // handle errored and done appropriately
        });
    });
    pg.end();
});
Run Code Online (Sandbox Code Playgroud)