如何等待流完成管道?(的NodeJS)

The*_*ter 28 asynchronous pipe node.js promise pdftotext

我有一个for循环数组,所以我使用Promise.all来完成它们然后调用它们.

let promises = [];
promises.push(promise1);
promises.push(promise2);
promises.push(promise3);

Promise.all(promises).then((responses) => {
  for (let i = 0; i < promises.length; i++) {
    if (promise.property === something) {
      //do something
    } else {
      let file = fs.createWriteStream('./hello.pdf');
      let stream = responses[i].pipe(file);
      /*
         I WANT THE PIPING AND THE FOLLOWING CODE 
         TO RUN BEFORE NEXT ITERATION OF FOR LOOP
      */
      stream.on('finish', () => {
        //extract the text out of the pdf
        extract(filePath, {splitPages: false}, (err, text) => {
        if (err) {
          console.log(err);
        } else {
          arrayOfDocuments[i].text_contents = text;
        }
      });
    });    
  }
}
Run Code Online (Sandbox Code Playgroud)

promise1,promise2和promise3是一些http请求,如果其中一个是application/pdf,那么我将它写入流并从中解析文本.但是这段代码在解析pdf中的测试之前会运行下一次迭代.有没有办法让代码等到流到管道和提取的管道完成后再继续下一次迭代?

bkn*_*hts 14

像下面这样的东西也会起作用.我经常使用这种模式:

let promises = [];
promises.push(promise1);
promises.push(promise2);
promises.push(promise3);

function doNext(){
  if(!promises.length) return;
  promises.shift().then((resolved) =>{
    if(resolved.property === something){
      ...
      doNext();
    }else{
      let file = fs.createWriteStream('./hello.pdf');
      let stream = resolved.pipe(file);
      stream.on('finish', () =>{
        ...
        doNext();
      });
    }

  })
}
doNext();
Run Code Online (Sandbox Code Playgroud)

或者将处理程序分解为控制器和Promisified处理程序:

function streamOrNot(obj){
  return new Promise(resolve, reject){
    if(obj.property === something){
      resolve();
      return;
    }
    let file = fs.createWriteStream...;
    stream.on('finish', () =>{
      ...
      resolve();
    });
  }
}

function doNext(){
  if(!promises.length) return;
  return promises.shift().then(streamOrNot).then(doNext);
}

doNext()
Run Code Online (Sandbox Code Playgroud)

  • 不监听流的错误事件不是一个好习惯。 (2认同)

Hai*_*han 14

没有异步/等待,它是非常讨厌的.使用async/await,只需执行以下操作:

Promise.all(promises).then(async (responses) => {
  for (...) {
    await new Promise(fulfill => stream.on("finish", fulfill));
    //extract the text out of the PDF
  }
})
Run Code Online (Sandbox Code Playgroud)

  • 其他提示:仅当调用者正确处理流时,`finish`事件才会触发。如果没有(例如,AWS SDK S3上传),则可以使用close事件来避免await永远坐在那里。 (3认同)

Chr*_*use 12

使用awaitwithstream.pipeline()代替stream.pipe()

import * as StreamPromises from "stream/promises";

...
await StreamPromises.pipeline(sourceStream, destinationStream);
Run Code Online (Sandbox Code Playgroud)

  • 遗憾的是,这只是 Node 16+,我们这些仍然需要 14 的人必须承诺管道方法。 (2认同)

Oxi*_*Oxi 4

您可以将 else 部分编写在自调用函数内。这样流的处理就会并行发生

(function(i) {
    let file = fs.createWriteStream('./hello.pdf');
    let stream = responses[i].pipe(file);
  /*
     I WANT THE PIPING AND THE FOLLOWING CODE 
     TO RUN BEFORE NEXT ITERATION OF FOR LOOP
  */
    stream.on('finish', () => {
      //extract the text out of the pdf
      extract(filePath, {splitPages: false}, (err, text) => {
      if (err) {
        console.log(err);
      } 
      else {
        arrayOfDocuments[i].text_contents = text;
      }
    });
  });    
})(i) 
Run Code Online (Sandbox Code Playgroud)

否则,您可以将流媒体部分作为原始/个人承诺本身的一部分来处理。

截至目前,您正在创建 Promise 并将其添加到数组中,而不是将 Promise.then 添加到数组中(这也是一个 Promise)。然后在处理程序内部进行流处理。