createReadStream end 在数据处理完成之前触发

use*_*780 6 javascript csv stream fs node.js

我正在尝试执行以下操作:

  1. 逐行流式传输 csv 文件。
  2. 修改每一行中包含的数据。
  3. 流式传输和处理所有行后,完成并继续下一个任务。

问题是.on("end").on("data")处理完每一行之前触发。处理完所有行.on("end")后如何触发.on("data")

下面是我正在谈论的一个简单的例子:

import parse from 'csv-parse'; 

var parser = parse({});

fs.createReadStream(this.upload.location)
.pipe(parser)
.on("data", line => {
  var num = Math.floor((Math.random() * 100) + 1);
  num = num % 3;
  num = num * 1000;
  setTimeout( () => { 
    console.log('data process complete');
  }, num);
})
.on("end", () => {
   console.log('Done: parseFile');
   next(null);
});
Run Code Online (Sandbox Code Playgroud)

提前致谢。

小智 0

我认为问题是事件监听器setTimeout中的(或任何其他异步任务)dataend之后触发data,但异步任务导致它即使在流触发后也记录消息end

如果你取出 ,setTimeout那么你会看到它记录了 data before 中的所有消息end。您仍然可以执行异步任务,但在流结束后可能会运行一批异步任务。

这段代码有助于解释发生了什么:

const fs = require('fs')

const testFileName = 'testfile.txt'

fs.writeFileSync(testFileName, '123456789')

let count = 0
const readStream = fs.createReadStream(testFileName, {
  encoding: 'utf8',
  highWaterMark: 1  // low highWaterMark so we can have more chunks to observe
})
readStream.on('data', (data) => {
  console.log('+++++++++++processing sync+++++++++++++')
  console.log(data)
  console.log('+++++++++++end processing sync+++++++++++++')
  setTimeout(() => {
    console.log('-----------processing async-------------')
    console.log(data)
    console.log('-----------end processing async-------------')
  }, ++count * 1000)
})
readStream.on('end', () => {
  console.log('stream ended but still have async tasks doing their thing')
  fs.unlinkSync(testFileName)
})
Run Code Online (Sandbox Code Playgroud)