在NodeJS中将大型csv文件(200'000行+)插入MongoDB

hxm*_*xmn 5 csv parsing mongodb node.js async.js

我正在尝试解析并将一个大的csv文件插入到MongoDB中,但是当文件扩展到100'000行时,我从服务器得到了错误的响应.我需要插入的文件通常超过200'000行.

我已经尝试了批量插入(insertMany)和Babyparse(Papaparse)流方法来逐行插入文件.但结果不佳.

节点api:

router.post('/csv-upload/:id', multipartMiddleware, function(req, res) {

    // Post vartiables
    var fileId = req.params.id;
    var csv = req.files.files.path;

    // create a queue object with concurrency 5
    var q = async.queue(function(row, callback) {
        var entry = new Entry(row);
        entry.save();
        callback();
    }, 5);

    baby.parseFiles(csv, {
        header: true, // Includes header in JSON
        skipEmptyLines: true,
        fastMode: true,
        step: function(results, parser) {
            results.data[0].id = fileId;

            q.push(results.data[0], function (err) {
                if (err) {throw err};
            });
        },
        complete: function(results, file) {
            console.log("Parsing complete:", results, file);
            q.drain = function() {
                console.log('All items have been processed');
                res.send("Completed!");
            };
        }
    });
});
Run Code Online (Sandbox Code Playgroud)

此流式传输方法导致:POST SERVER net :: ERR_EMPTY_RESPONSE

不确定我是否正确使用async.queue.

是否有更好,更有效的方法来做这件事还是我做错了什么?

Express Server:

// Dependencies
var express = require('express');
var path = require('path');
var bodyParser = require('body-parser');
var routes = require('./server/routes');
var mongoose = require("mongoose");
var babel = require("babel-core/register");
var compression = require('compression');
var PORT = process.env.PORT || 3000;
// Include the cluster module
var cluster = require('cluster');

mongoose.connect(process.env.MONGOLAB_URI || 'mongodb://localhost/routes');

  // Code to run if we're in the master process
 if (cluster.isMaster) {

    // Count the machine's CPUs
    var cpuCount = require('os').cpus().length;

    // Create a worker for each CPU
    for (var i = 0; i < cpuCount; i += 1) {
        cluster.fork();
    }

 // Code to run if we're in a worker process
 } else {
    // Express
    var app = express();

    app.use(bodyParser.json({limit: '50mb'}));
    app.use(bodyParser.urlencoded({limit: '50mb', extended: true}));

    // Compress responses
    app.use(compression());

    // Used for production build
    app.use(express.static(path.join(__dirname, 'public')));

    routes(app);

    // Routes
    app.use('/api', require('./server/routes/api'));

    app.all('/*', function(req, res) {
        res.sendFile(path.join(__dirname, 'public/index.html'));
    });

    // Start server
    app.listen(PORT, function() {
        console.log('Server ' + cluster.worker.id + ' running on ' + PORT);
    });
}
Run Code Online (Sandbox Code Playgroud)

use*_*572 5

处理导入:

很好的问题,根据我的经验,迄今为止将 csv 插入 mongo 的最快方法是通过命令行:

mongoimport -d db_name -c collection_name --type csv --file file.csv --headerline 
Run Code Online (Sandbox Code Playgroud)

我不相信 mongoose 有办法调用 mongoimport (如果我错了,有人纠正我)

但直接通过节点调用很简单:

var exec = require('child_process').exec;
var cmd = 'mongoimport -d db_name -c collection_name --type csv --file file.csv --headerline';

exec(cmd, function(error, stdout, stderr) {
  // do whatever you need during the callback
});
Run Code Online (Sandbox Code Playgroud)

上面的内容必须修改为动态的,但它应该是不言自明的。

处理上传:

从前端客户端上传文件是另一个挑战。

如果您向服务器发出请求并且在 60 秒内没有得到响应(可能就是您上面提到的),大多数浏览器都会超时

一种解决方案是打开套接字连接(在 npm 中搜索 socket.io)以获取详细信息。这将创建与服务器的持续连接,并且不受超时限制。

如果上传不是问题,并且超时是由于解析/插入缓慢造成的,那么一旦实现上述操作,您可能就不必担心这个问题。

其他考虑因素:

我不确定您到底需要发送回用户什么,或者需要进行什么解析。但这可以在正常请求/响应周期之外完成,也可以在套接字连接期间处理(如果在一个请求/响应周期期间需要)。