在 Node.js 中处理大型 CSV 上传

Tsa*_*mba 7 javascript asynchronous node.js

根据这里的先前线程:

节点异步循环 - 如何使此代码按顺序运行?

...我正在寻找有关处理大型数据上传文件的更广泛的建议。

设想:

用户上传了一个非常大的 CSV 文件,其中包含数十万到数百万行。它使用 multer 流式传输到端点:

const storage = multer.memoryStorage();
const upload = multer({ storage: storage });

router.post("/", upload.single("upload"), (req, res) => {
    //...
});
Run Code Online (Sandbox Code Playgroud)

每一行都被转换成一个 JSON 对象。然后将该对象映射到几个较小的对象中,这些对象需要插入到几个不同的表中,分布在各种微服务容器中并由其访问。

async.forEachOfSeries(data, (line, key, callback) => {
    let model = splitData(line);
    //save model.record1, model.record2, etc. sequentially
});
Run Code Online (Sandbox Code Playgroud)

很明显,使用这种方法我会遇到内存限制。这样做的最有效方式是什么?

Nik*_*des 10

为了避免内存问题,您需要使用处理文件- 简单地说,增量。不是将整个文件加载到内存中,而是读取每一,它会得到相应的处理,然后在符合垃圾收集条件后立即进行处理。

在 Node 中,您可以结合使用CSV 流解析器将二进制内容作为 CSV 行和through2(一种允许您控制的流的流实用程序)的组合来实现;在这种情况下,暂时暂停它以允许将行保存在数据库中。

过程

该过程如下:

  • 您获取数据流
  • 您通过 CSV 解析器传输它
  • 你通过一个 through2 管道它
  • 您将每一行保存在数据库中
  • 完成保存后,请致电cb()以继续下一个项目。

我不熟悉,multer但这里有一个使用来自文件的流的示例。

const fs = require('fs')
const csv = require('csv-stream')
const through2 = require('through2')

const stream = fs.createReadStream('foo.csv')
  .pipe(csv.createStream({
      endLine : '\n',
      columns : ['Year', 'Make', 'Model'],
      escapeChar : '"',
      enclosedChar : '"'
  }))
  .pipe(through2({ objectMode: true }, (row, enc, cb) => {
    // - `row` holds the first row of the CSV,
    //   as: `{ Year: '1997', Make: 'Ford', Model: 'E350' }`
    // - The stream won't process the *next* item unless you call the callback
    //  `cb` on it.
    // - This allows us to save the row in our database/microservice and when
    //   we're done, we call `cb()` to move on to the *next* row.
    saveIntoDatabase(row).then(() => {
      cb(null, true)
    })
    .catch(err => {
      cb(err, null)
    })
  }))
  .on('data', data => {
    console.log('saved a row')
  })
  .on('end', () => {
    console.log('end')
  })
  .on('error', err => {
    console.error(err)
  })

// Mock function that emulates saving the row into a database,
// asynchronously in ~500 ms
const saveIntoDatabase = row =>
  new Promise((resolve, reject) =>
    setTimeout(() => resolve(), 500))
Run Code Online (Sandbox Code Playgroud)

示例foo.csvCSV 是这样的:

1997,Ford,E350
2000,Mercury,Cougar
1998,Ford,Focus
2005,Jaguar,XKR
1991,Yugo,LLS
2006,Mercedes,SLK
2009,Porsche,Boxter
2001,Dodge,Viper
Run Code Online (Sandbox Code Playgroud)

为什么?

这种方法避免了必须在内存中加载整个 CSV。一旦 arow被处理,它就会超出范围/变得无法访问,因此它有资格进行垃圾收集。这就是使这种方法如此有效的内存的原因。理论上,这允许您处理无限大小的文件。阅读流手册的更多信息,请。

一些技巧

  • 您可能希望每个周期保存/处理超过 1 行(以相同大小的)。在这种情况下,将一些rows推入一个 Array,处理/保存整个 Array(块),然后调用cb移动到下一个块 - 重复该过程。
  • 流发出您可以监听的事件。该end/error事件是用于响应回操作是否成功或失败的特别有用的。
  • Express 默认使用流 - 我几乎可以肯定您根本不需要multer

  • 以上或多或少说明了这个概念。它需要调整 - 我要做的第一件事是在“Array”中累积 5000-10000 行,并在可能的情况下一次性保存/处理该块。当然,每一行都接触数据库的效率非常低。 (2认同)