Sri*_*Sri 6 postgresql json node.js express
我试图通过接收来自客户端的请求每个批量插入尝试10,000个记录(使用sequelize.js和bulkCreate())来插入数百万条记录(大约6个字段/列)
这显然是一个坏主意,所以我试着调查 node-pg-copy-streams
但是,我不想在客户端发起更改,其中json数组是这样发送的
# python
data = [
{
"column a":"a values",
"column b":"b values",
},
...
# 10,000 items
...
]
request.post(data=json.dumps(data), url=url)
Run Code Online (Sandbox Code Playgroud)
在nodejs的服务器端,我将如何流式传输request.body以下骨架中的接收?
.post(function(req, res){
// old sequelize code
/* table5.bulkCreate(
req.body, {raw:true}
).then(function(){
return table5.findAll();
}).then(function(result){
res.json(result.count);
});*/
// new pg-copy-streams code
pg.connect(function(err, client, done) {
var stream = client.query(copyFrom('COPY my_table FROM STDIN'));
// My question is here, how would I stream or pipe the request body ?
// ?.on('error', done);
// ?.pipe(stream).on('finish', done).on('error', done);
});
});
Run Code Online (Sandbox Code Playgroud)
这是我解决问题的方法
首先是一个将我的 req.body 字典转换为 TSV 的函数(不是最初问题的一部分)
/**
* Converts a dictionary and set of keys to a Tab Separated Value blob of text
* @param {Dictionary object} dict
* @param {Array of Keys} keys
* @return {Concatenated Tab Separated Values} String
*/
function convertDictsToTSV(dicts, keys){
// ...
}
Run Code Online (Sandbox Code Playgroud)
其次是我原来的 .post 函数的其余部分
.post(function(req, res){
// ...
/* requires 'stream' as
* var stream = require('stream');
* var copyFrom = require('pg-copy-streams').from;
*/
var read_stream_string = new stream.Readable();
read_stream_string.read = function noop() {};
var keys = [...]; // set of dictionary keys to extract from req.body
read_stream_string.push(convertDictsToTSV(req.body, keys));
read_stream_string.push(null);
pg.connect(connectionString, function(err, client, done) {
// ...
// error handling
// ...
var copy_string = 'Copy tablename (' + keys.join(',') + ') FROM STDIN'
var pg_copy_stream = client.query( copyFrom( copy_string ) );
read_stream_string.pipe(pg_copy_stream).on('finish', function(finished){
// handle finished and done appropriately
}).on('error', function(errored){
// handle errored and done appropriately
});
});
pg.end();
});
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2099 次 |
| 最近记录: |