nodejs 在 createReadStream 中异步等待

Pri*_*rua 1 node.js async-await nodejs-stream

我正在逐行读取 CSV 文件并在 MongoDB 中插入/更新。预期的输出将是 1。 console.log(row); 2. console.log(光标); 3.console.log("流");

但是得到像 1.console.log(row); 这样的输出。控制台日志(行);控制台日志(行);控制台日志(行);控制台日志(行);…………………… 2. console.log(cursor); 3.console.log("流"); 请让我知道我在这里缺少什么。

const csv = require('csv-parser');
const fs = require('fs');

var mongodb = require("mongodb");

var client = mongodb.MongoClient;
var url = "mongodb://localhost:27017/";
var collection;
client.connect(url,{ useUnifiedTopology: true }, function (err, client) {

  var db = client.db("UKCompanies");
  collection = db.collection("company");
  startRead();
});
var cursor={};

async function insertRec(row){
  console.log(row);
  cursor = await collection.update({CompanyNumber:23}, row, {upsert: true});
  if(cursor){
    console.log(cursor);
  }else{
    console.log('not exist')
  }
  console.log("stream");
}



async function startRead() {
  fs.createReadStream('./data/inside/6.csv')
    .pipe(csv())
    .on('data', async (row) => {
      await insertRec(row);
    })
    .on('end', () => {
      console.log('CSV file successfully processed');
    });
}
Run Code Online (Sandbox Code Playgroud)

jfr*_*d00 10

在您的startRead()函数中,await insertRec()不会datainsertRec()处理时阻止更多事件的流动。因此,如果您不希望下一个data事件在完成之前运行insertRec(),您需要暂停,然后恢复流。

async function startRead() {
  const stream = fs.createReadStream('./data/inside/6.csv')
    .pipe(csv())
    .on('data', async (row) => {
      try {
        stream.pause();
        await insertRec(row);
      } finally {
        stream.resume();
      }
    })
    .on('end', () => {
      console.log('CSV file successfully processed');
    });
}
Run Code Online (Sandbox Code Playgroud)

仅供参考,如果insertRec()失败,您还需要一些错误处理。

  • 流暂停/恢复是非常有问题的并且很少起作用 (2认同)

Ale*_*x K 5

Node 10+ ReadableStream 获得了Symbol.asyncIterator属性,它允许使用for-await-of处理流

async function startRead() {
    const readStream = fs.createReadStream('./data/inside/6.csv');    
    
    for await (const row of readStream.pipe(csv())) {
        await insertRec(row);
    }

    console.log('CSV file successfully processed');
}
Run Code Online (Sandbox Code Playgroud)